diff --git a/neighsyncd/Makefile.am b/neighsyncd/Makefile.am index 0bfc7c385c6c..361d6884f823 100644 --- a/neighsyncd/Makefile.am +++ b/neighsyncd/Makefile.am @@ -1,4 +1,4 @@ -INCLUDES = -I $(top_srcdir) +INCLUDES = -I $(top_srcdir) -I $(top_srcdir)/warmrestart bin_PROGRAMS = neighsyncd @@ -8,7 +8,7 @@ else DBGFLAGS = -g endif -neighsyncd_SOURCES = neighsyncd.cpp neighsync.cpp +neighsyncd_SOURCES = neighsyncd.cpp neighsync.cpp $(top_srcdir)/warmrestart/warm_restart.cpp $(top_srcdir)/warmrestart/warmRestartAssist.cpp neighsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) neighsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) diff --git a/neighsyncd/neighsync.cpp b/neighsyncd/neighsync.cpp index 7a5a55f204fa..d0e36cc5fffe 100644 --- a/neighsyncd/neighsync.cpp +++ b/neighsyncd/neighsync.cpp @@ -11,12 +11,14 @@ #include "linkcache.h" #include "neighsync.h" +#include "warm_restart.h" using namespace std; using namespace swss; -NeighSync::NeighSync(DBConnector *db) : - m_neighTable(db, APP_NEIGH_TABLE_NAME) +NeighSync::NeighSync(RedisPipeline *pipelineAppDB) : + m_neighTable(pipelineAppDB, APP_NEIGH_TABLE_NAME), + m_AppRestartAssist(pipelineAppDB, "neighsyncd", "swss", &m_neighTable, DEFAULT_NEIGHSYNC_WARMSTART_TIMER) { } @@ -52,18 +54,32 @@ void NeighSync::onMsg(int nlmsg_type, struct nl_object *obj) key+= ipStr; int state = rtnl_neigh_get_state(neigh); + bool delete_key = false; if ((nlmsg_type == RTM_DELNEIGH) || (state == NUD_INCOMPLETE) || (state == NUD_FAILED)) { - m_neighTable.del(key); - return; + delete_key = true; } nl_addr2str(rtnl_neigh_get_lladdr(neigh), macStr, MAX_ADDR_SIZE); + std::vector fvVector; FieldValueTuple f("family", family); FieldValueTuple nh("neigh", macStr); fvVector.push_back(nh); fvVector.push_back(f); - m_neighTable.set(key, fvVector); + + if (m_AppRestartAssist.isWarmStartInProgress()) + { + m_AppRestartAssist.insertToMap(key, fvVector, delete_key); + } + else + { + if (delete_key == true) + { + m_neighTable.del(key); + return; + } + m_neighTable.set(key, fvVector); + } } diff --git a/neighsyncd/neighsync.h b/neighsyncd/neighsync.h index 05c7d073202c..1889e0cadabb 100644 --- a/neighsyncd/neighsync.h +++ b/neighsyncd/neighsync.h @@ -4,6 +4,9 @@ #include "dbconnector.h" #include "producerstatetable.h" #include "netmsg.h" +#include "warmRestartAssist.h" + +#define DEFAULT_NEIGHSYNC_WARMSTART_TIMER 5 namespace swss { @@ -12,12 +15,18 @@ class NeighSync : public NetMsg public: enum { MAX_ADDR_SIZE = 64 }; - NeighSync(DBConnector *db); + NeighSync(RedisPipeline *pipelineAppDB); virtual void onMsg(int nlmsg_type, struct nl_object *obj); + AppRestartAssist *getRestartAssist() + { + return &m_AppRestartAssist; + } + private: ProducerStateTable m_neighTable; + AppRestartAssist m_AppRestartAssist; }; } diff --git a/neighsyncd/neighsyncd.cpp b/neighsyncd/neighsyncd.cpp index 3694ba8cbf50..07236a8515e3 100644 --- a/neighsyncd/neighsyncd.cpp +++ b/neighsyncd/neighsyncd.cpp @@ -11,8 +11,11 @@ using namespace swss; int main(int argc, char **argv) { Logger::linkToDbNative("neighsyncd"); - DBConnector db(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0); - NeighSync sync(&db); + + DBConnector appDb(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0); + RedisPipeline pipelineAppDB(&appDb); + + NeighSync sync(&pipelineAppDB); NetDispatcher::getInstance().registerMessageHandler(RTM_NEWNEIGH, &sync); NetDispatcher::getInstance().registerMessageHandler(RTM_DELNEIGH, &sync); @@ -29,10 +32,23 @@ int main(int argc, char **argv) netlink.dumpRequest(RTM_GETNEIGH); s.addSelectable(&netlink); + if (sync.getRestartAssist()->isWarmStartInProgress()) + { + sync.getRestartAssist()->readTableToMap(); + sync.getRestartAssist()->startReconcileTimer(s); + } while (true) { Selectable *temps; s.select(&temps); + if (sync.getRestartAssist()->isWarmStartInProgress()) + { + if (sync.getRestartAssist()->checkReconcileTimer(temps)) + { + sync.getRestartAssist()->stopReconcileTimer(s); + sync.getRestartAssist()->reconcile(); + } + } } } catch (const std::exception& e) diff --git a/tests/test_warm_reboot.py b/tests/test_warm_reboot.py index 50057646bd13..a7062231b38d 100644 --- a/tests/test_warm_reboot.py +++ b/tests/test_warm_reboot.py @@ -63,6 +63,21 @@ def swss_app_check_RestartCount_single(state_db, restart_count, name): assert int(fv[1]) == restart_count[key] + 1 elif fv[0] == "state": assert fv[1] == "reconciled" + +def swss_app_check_warmstart_state(state_db, name, state): + warmtbl = swsscommon.Table(state_db, swsscommon.STATE_WARM_RESTART_TABLE_NAME) + keys = warmtbl.getKeys() + print(keys) + assert len(keys) > 0 + for key in keys: + if key != name: + continue + (status, fvs) = warmtbl.get(key) + assert status == True + for fv in fvs: + if fv[0] == "state": + assert fv[1] == state + def create_entry(tbl, key, pairs): fvs = swsscommon.FieldValuePairs(pairs) tbl.set(key, fvs) @@ -285,3 +300,386 @@ def test_VlanMgrdWarmRestart(dvs): assert status == True swss_app_check_RestartCount_single(state_db, restart_count, "vlanmgrd") + + +# function to stop neighsyncd service and clear syslog and sairedis records +def stop_neighsyncd_clear_syslog_sairedis(dvs, save_number): + dvs.runcmd(['sh', '-c', 'pkill -x neighsyncd']) + dvs.runcmd("cp /var/log/swss/sairedis.rec /var/log/swss/sairedis.rec.back{}".format(save_number)) + dvs.runcmd(['sh', '-c', '> /var/log/swss/sairedis.rec']) + dvs.runcmd("cp /var/log/syslog /var/log/syslog.back{}".format(save_number)) + dvs.runcmd(['sh', '-c', '> /var/log/syslog']) + +def start_neighsyncd(dvs): + dvs.runcmd(['sh', '-c', 'supervisorctl start neighsyncd']) + +def check_no_neighsyncd_timer(dvs): + (exitcode, string) = dvs.runcmd(['sh', '-c', 'grep getWarmStartTimer /var/log/syslog | grep neighsyncd | grep invalid']) + assert string.strip() != "" + +def check_neighsyncd_timer(dvs, timer_value): + (exitcode, num) = dvs.runcmd(['sh', '-c', "grep getWarmStartTimer /var/log/syslog | grep neighsyncd | rev | cut -d ' ' -f 1 | rev"]) + assert num.strip() == timer_value + +# function to check neighbor entry reconciliation status written in syslog +def check_syslog_for_neighbor_entry(dvs, new_cnt, delete_cnt, iptype): + # check reconciliation results (new or delete entries) for ipv4 and ipv6 + if iptype == "ipv4": + (exitcode, num) = dvs.runcmd(['sh', '-c', 'grep neighsyncd /var/log/syslog| grep cache-state:NEW | grep IPv4 | wc -l']) + assert num.strip() == str(new_cnt) + (exitcode, num) = dvs.runcmd(['sh', '-c', 'grep neighsyncd /var/log/syslog| grep cache-state:DELETE | grep IPv4 | wc -l']) + assert num.strip() == str(delete_cnt) + elif iptype == "ipv6": + (exitcode, num) = dvs.runcmd(['sh', '-c', 'grep neighsyncd /var/log/syslog| grep cache-state:NEW | grep IPv6 | wc -l']) + assert num.strip() == str(new_cnt) + (exitcode, num) = dvs.runcmd(['sh', '-c', 'grep neighsyncd /var/log/syslog| grep cache-state:DELETE | grep IPv6 | wc -l']) + assert num.strip() == str(delete_cnt) + else: + assert "iptype is unknown" == "" + + +# function to check sairedis record for neighbor entries +def check_sairedis_for_neighbor_entry(dvs, create_cnt, set_cnt, remove_cnt): + # check create/set/remove operations for neighbor entries during warm restart + (exitcode, num) = dvs.runcmd(['sh', '-c', 'grep \|c\| /var/log/swss/sairedis.rec | grep NEIGHBOR_ENTRY | wc -l']) + assert num.strip() == str(create_cnt) + (exitcode, num) = dvs.runcmd(['sh', '-c', 'grep \|s\| /var/log/swss/sairedis.rec | grep NEIGHBOR_ENTRY | wc -l']) + assert num.strip() == str(set_cnt) + (exitcode, num) = dvs.runcmd(['sh', '-c', 'grep \|r\| /var/log/swss/sairedis.rec | grep NEIGHBOR_ENTRY | wc -l']) + assert num.strip() == str(remove_cnt) + + +def test_swss_neighbor_syncup(dvs): + + appl_db = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + conf_db = swsscommon.DBConnector(swsscommon.CONFIG_DB, dvs.redis_sock, 0) + state_db = swsscommon.DBConnector(swsscommon.STATE_DB, dvs.redis_sock, 0) + + # enable warm restart + # TODO: use cfg command to config it + create_entry_tbl( + conf_db, + swsscommon.CFG_WARM_RESTART_TABLE_NAME, "swss", + [ + ("enable", "true"), + ] + ) + + # + # Testcase1: + # Add neighbor entries in linux kernel, appDB should get all of them + # + + # create neighbor entries (4 ipv4 and 4 ip6, two each on each interface) in linux kernel + intfs = ["Ethernet24", "Ethernet28"] + #enable ipv6 on docker + dvs.runcmd("sysctl net.ipv6.conf.all.disable_ipv6=0") + + dvs.runcmd("ifconfig {} 24.0.0.1/24 up".format(intfs[0])) + dvs.runcmd("ip -6 addr add 2400::1/64 dev {}".format(intfs[0])) + + dvs.runcmd("ifconfig {} 28.0.0.1/24 up".format(intfs[1])) + dvs.runcmd("ip -6 addr add 2800::1/64 dev {}".format(intfs[1])) + + ips = ["24.0.0.2", "24.0.0.3", "28.0.0.2", "28.0.0.3"] + v6ips = ["2400::2", "2400::3", "2800::2", "2800::3"] + + macs = ["00:00:00:00:24:02", "00:00:00:00:24:03", "00:00:00:00:28:02", "00:00:00:00:28:03"] + + for i in range(len(ips)): + dvs.runcmd("ip neigh add {} dev {} lladdr {}".format(ips[i], intfs[i%2], macs[i])) + + for i in range(len(v6ips)): + dvs.runcmd("ip -6 neigh add {} dev {} lladdr {}".format(v6ips[i], intfs[i%2], macs[i])) + + time.sleep(1) + + # Check the neighbor entries are inserted correctly + db = swsscommon.DBConnector(0, dvs.redis_sock, 0) + tbl = swsscommon.Table(db, "NEIGH_TABLE") + + for i in range(len(ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], ips[i])) + assert status == True + + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv4" + + for i in range(len(v6ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], v6ips[i])) + assert status == True + + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv6" + + # + # Testcase 2: + # Restart neighsyncd without change neighbor entries, nothing should be sent to appDB or sairedis, + # appDB should be kept the same. + # + + # get restart_count + restart_count = swss_get_RestartCount(state_db) + + # stop neighsyncd and clear syslog and sairedis.rec + stop_neighsyncd_clear_syslog_sairedis(dvs, 1) + + start_neighsyncd(dvs) + time.sleep(10) + + # Check the neighbor entries are still in appDB correctly + for i in range(len(ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], ips[i])) + assert status == True + + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv4" + + for i in range(len(v6ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], v6ips[i])) + assert status == True + + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv6" + + # check syslog and sairedis.rec file for activities + check_syslog_for_neighbor_entry(dvs, 0, 0, "ipv4") + check_syslog_for_neighbor_entry(dvs, 0, 0, "ipv6") + check_sairedis_for_neighbor_entry(dvs, 0, 0, 0) + + # check restart Count + swss_app_check_RestartCount_single(state_db, restart_count, "neighsyncd") + + # + # Testcase 3: + # stop neighsyncd, delete even nummber ipv4/ipv6 neighbor entries from each interface, warm start neighsyncd. + # the neighsyncd is supposed to sync up the entries from kernel after warm restart + # note: there was an issue for neighbor delete, it will be marked as FAILED instead of deleted in kernel + # but it will send netlink message to be removed from appDB, so it works ok here, + # just that if we want to add the same neighbor again, use "change" instead of "add" + + # get restart_count + restart_count = swss_get_RestartCount(state_db) + + # stop neighsyncd and clear syslog and sairedis.rec + stop_neighsyncd_clear_syslog_sairedis(dvs, 2) + + # delete even nummber of ipv4/ipv6 neighbor entries from each interface + for i in range(0, len(ips), 2): + dvs.runcmd("ip neigh del {} dev {}".format(ips[i], intfs[i%2])) + + for i in range(0, len(v6ips), 2): + dvs.runcmd("ip -6 neigh del {} dev {}".format(v6ips[i], intfs[i%2])) + + # start neighsyncd again + start_neighsyncd(dvs) + time.sleep(10) + + # check ipv4 and ipv6 neighbors + for i in range(len(ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], ips[i])) + #should not see deleted neighbor entries + if i %2 == 0: + assert status == False + continue + else: + assert status == True + + #undeleted entries should still be there. + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv4" + + for i in range(len(v6ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], v6ips[i])) + #should not see deleted neighbor entries + if i %2 == 0: + assert status == False + continue + else: + assert status == True + + #undeleted entries should still be there. + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv6" + + # check syslog and sairedis.rec file for activities + # 2 deletes each for ipv4 and ipv6 + # 4 remove actions in sairedis + check_syslog_for_neighbor_entry(dvs, 0, 2, "ipv4") + check_syslog_for_neighbor_entry(dvs, 0, 2, "ipv6") + check_sairedis_for_neighbor_entry(dvs, 0, 0, 4) + # check restart Count + swss_app_check_RestartCount_single(state_db, restart_count, "neighsyncd") + + + # + # Testcase 4: + # Stop neighsyncd, add even nummber of ipv4/ipv6 neighbor entries to each interface again, + # use "change" due to the kernel behaviour, start neighsyncd. + # The neighsyncd is supposed to sync up the entries from kernel after warm restart + # Check the timer is not retrieved from configDB since it is not configured + + # get restart_count + restart_count = swss_get_RestartCount(state_db) + + # stop neighsyncd and clear syslog and sairedis.rec + stop_neighsyncd_clear_syslog_sairedis(dvs, 3) + + # add even nummber of ipv4/ipv6 neighbor entries to each interface + for i in range(0, len(ips), 2): + dvs.runcmd("ip neigh change {} dev {} lladdr {}".format(ips[i], intfs[i%2], macs[i])) + + for i in range(0, len(v6ips), 2): + dvs.runcmd("ip -6 neigh change {} dev {} lladdr {}".format(v6ips[i], intfs[i%2], macs[i])) + + # start neighsyncd again + start_neighsyncd(dvs) + time.sleep(10) + + # no neighsyncd timer configured + check_no_neighsyncd_timer(dvs) + + # check ipv4 and ipv6 neighbors, should see all neighbors + for i in range(len(ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv4" + + for i in range(len(v6ips)): + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], v6ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv6" + + # check syslog and sairedis.rec file for activities + # 2 news entries for ipv4 and ipv6 each + # 4 create actions for sairedis + check_syslog_for_neighbor_entry(dvs, 2, 0, "ipv4") + check_syslog_for_neighbor_entry(dvs, 2, 0, "ipv6") + check_sairedis_for_neighbor_entry(dvs, 4, 0, 0) + # check restart Count + swss_app_check_RestartCount_single(state_db, restart_count, "neighsyncd") + + # + # Testcase 5: + # Even number of ip4/6 neigbors updated with new mac. + # Odd number of ipv4/6 neighbors removed and added to different interfaces. + # neighbor syncd should sync it up after warm restart + # include the timer settings in this testcase + + # setup timer in configDB + timer_value = "15" + + create_entry_tbl( + conf_db, + swsscommon.CFG_WARM_RESTART_TABLE_NAME, "swss", + [ + ("neighsyncd_timer", timer_value), + ] + ) + + # get restart_count + restart_count = swss_get_RestartCount(state_db) + + # stop neighsyncd and clear syslog and sairedis.rec + stop_neighsyncd_clear_syslog_sairedis(dvs, 4) + + # Even number of ip4/6 neigbors updated with new mac. + # Odd number of ipv4/6 neighbors removed and added to different interfaces. + newmacs = ["00:00:00:01:12:02", "00:00:00:01:12:03", "00:00:00:01:16:02", "00:00:00:01:16:03"] + + for i in range(len(ips)): + if i % 2 == 0: + dvs.runcmd("ip neigh change {} dev {} lladdr {}".format(ips[i], intfs[i%2], newmacs[i])) + else: + dvs.runcmd("ip neigh del {} dev {}".format(ips[i], intfs[i%2])) + dvs.runcmd("ip neigh add {} dev {} lladdr {}".format(ips[i], intfs[1-i%2], macs[i])) + + for i in range(len(v6ips)): + if i % 2 == 0: + dvs.runcmd("ip -6 neigh change {} dev {} lladdr {}".format(v6ips[i], intfs[i%2], newmacs[i])) + else: + dvs.runcmd("ip -6 neigh del {} dev {}".format(v6ips[i], intfs[i%2])) + dvs.runcmd("ip -6 neigh add {} dev {} lladdr {}".format(v6ips[i], intfs[1-i%2], macs[i])) + + # start neighsyncd again + start_neighsyncd(dvs) + time.sleep(10) + + # timer is not expired yet, state should be "restored" + swss_app_check_warmstart_state(state_db, "neighsyncd", "restored") + time.sleep(10) + + # check neigh syncd timer is retrived from configDB + check_neighsyncd_timer(dvs, timer_value) + + # check ipv4 and ipv6 neighbors, should see all neighbors with updated info + for i in range(len(ips)): + if i % 2 == 0: + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == newmacs[i] + if v[0] == "family": + assert v[1] == "IPv4" + else: + (status, fvs) = tbl.get("{}:{}".format(intfs[1-i%2], ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv4" + + for i in range(len(v6ips)): + if i % 2 == 0: + (status, fvs) = tbl.get("{}:{}".format(intfs[i%2], v6ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == newmacs[i] + if v[0] == "family": + assert v[1] == "IPv6" + else: + (status, fvs) = tbl.get("{}:{}".format(intfs[1-i%2], v6ips[i])) + assert status == True + for v in fvs: + if v[0] == "neigh": + assert v[1] == macs[i] + if v[0] == "family": + assert v[1] == "IPv6" + + # check syslog and sairedis.rec file for activities + # 4 news, 2 deletes for ipv4 and ipv6 each + # 8 create, 4 set, 4 removes for sairedis + check_syslog_for_neighbor_entry(dvs, 4, 2, "ipv4") + check_syslog_for_neighbor_entry(dvs, 4, 2, "ipv6") + check_sairedis_for_neighbor_entry(dvs, 4, 4, 4) + # check restart Count + swss_app_check_RestartCount_single(state_db, restart_count, "neighsyncd") diff --git a/warmrestart/warmRestartAssist.cpp b/warmrestart/warmRestartAssist.cpp new file mode 100644 index 000000000000..bfce3365b31f --- /dev/null +++ b/warmrestart/warmRestartAssist.cpp @@ -0,0 +1,269 @@ +#include +#include "logger.h" +#include "schema.h" +#include "warm_restart.h" +#include "warmRestartAssist.h" + +using namespace std; +using namespace swss; + +const AppRestartAssist::cache_state_map AppRestartAssist::cacheStateMap = +{ + {STALE, "STALE"}, + {SAME, "SAME"}, + {NEW, "NEW"}, + {DELETE, "DELETE"} +}; + +AppRestartAssist::AppRestartAssist(RedisPipeline *pipeline, + const std::string &appName, const std::string &dockerName, + ProducerStateTable *psTable, const uint32_t defaultWarmStartTimerValue): + m_appTable(pipeline, APP_NEIGH_TABLE_NAME, false), + m_appName(appName), + m_dockerName(dockerName), + m_psTable(psTable), + m_warmStartTimer(timespec{0, 0}) +{ + WarmStart::checkWarmStart(m_appName, m_dockerName); + + m_appTableName = m_appTable.getTableName(); + + // set the default timer value + if (defaultWarmStartTimerValue > MAXIMUM_WARMRESTART_TIMER_VALUE) + { + throw std::invalid_argument("invalid timer value was provided"); + } + else if (defaultWarmStartTimerValue != 0) + { + m_reconcileTimer = defaultWarmStartTimerValue; + } + else + { + m_reconcileTimer = DEFAULT_INTERNAL_TIMER_VALUE; + } + + if (!WarmStart::isWarmStart()) + { + m_warmStartInProgress = false; + } + else + { + m_warmStartInProgress = true; + uint32_t temp_value = WarmStart::getWarmStartTimer(m_appName, m_dockerName); + if (temp_value != 0) + { + m_reconcileTimer = temp_value; + } + + m_warmStartTimer.setInterval(timespec{m_reconcileTimer, 0}); + + // Clear the producerstate table to make sure no pending data for the AppTable + m_psTable->clear(); + + WarmStart::setWarmStartState(m_appName, WarmStart::INIT); + } +} + +AppRestartAssist::~AppRestartAssist() +{ +} + +/* join the field-value strings for straight printing */ +string AppRestartAssist::joinVectorString(const vector &fv) +{ + string s; + for (const auto &temps : fv ) + { + s += temps.first + ":" + temps.second + ", "; + } + return s; +} + +void AppRestartAssist::setCacheEntryState(std::vector &fvVector, + cache_state_t state) +{ + fvVector.back().second = cacheStateMap.at(state); +} + +AppRestartAssist::cache_state_t AppRestartAssist::getCacheEntryState(const std::vector &fvVector) +{ + for (auto &iter : cacheStateMap) + { + if (fvVector.back().second == iter.second) + { + return iter.first; + } + } + throw std::logic_error("cache entry state is invalid"); +} + +/* Read table from APPDB and append stale flag then insert to cachemap */ +void AppRestartAssist::readTableToMap() +{ + vector keys; + + m_appTable.getKeys(keys); + FieldValueTuple state(CACHE_STATE_FIELD, ""); + + for (const auto &key: keys) + { + vector fv; + + /* if the fieldvalue is empty, skip */ + if (!m_appTable.get(key, fv)) + { + continue; + } + + fv.push_back(state); + setCacheEntryState(fv, STALE); + + string s = joinVectorString(fv); + + SWSS_LOG_INFO("write to cachemap: %s, key: %s, " + "%s", m_appTableName.c_str(), key.c_str(), s.c_str()); + + // insert to the cache map + appTableCacheMap[key] = fv; + } + WarmStart::setWarmStartState(m_appName, WarmStart::RESTORED); + SWSS_LOG_NOTICE("Restored appDB table to internal cache map"); + return; +} + +void AppRestartAssist::insertToMap(string key, vector fvVector, bool delete_key) +{ + SWSS_LOG_INFO("Received message %s, key: %s, " + "%s, delete = %d", m_appTableName.c_str(), key.c_str(), joinVectorString(fvVector).c_str(), delete_key); + + /* + * Check and insert to CacheMap Logic: + * if delete_key, mark the entry as "DELETE"; + * else: + * if key exist { + * if it has different value: update with "NEW" flag. + * if same value: mark it as "SAME"; + * } else { + * insert with "NEW" flag. + * } + */ + + auto found = appTableCacheMap.find(key); + + if (delete_key) + { + SWSS_LOG_NOTICE("%s, delete key: %s, ", m_appTableName.c_str(), key.c_str()); + /* mark it as DELETE if exist, otherwise, no-op */ + if (found != appTableCacheMap.end()) + { + setCacheEntryState(found->second, DELETE); + } + } + else if (found != appTableCacheMap.end()) + { + /* check only the original vector range (exclude cache-state field/value) */ + if(!equal(fvVector.begin(), fvVector.end(), found->second.begin())) + { + SWSS_LOG_NOTICE("%s, found key: %s, new value ", m_appTableName.c_str(), key.c_str()); + + FieldValueTuple state(CACHE_STATE_FIELD, ""); + fvVector.push_back(state); + + // mark as NEW flag + setCacheEntryState(fvVector, NEW); + appTableCacheMap[key] = fvVector; + } + else + { + SWSS_LOG_INFO("%s, found key: %s, same value", m_appTableName.c_str(), key.c_str()); + + // mark as SAME flag + setCacheEntryState(found->second, SAME); + } + } + else + { + SWSS_LOG_NOTICE("%s, not found key: %s, new", m_appTableName.c_str(), key.c_str()); + FieldValueTuple state(CACHE_STATE_FIELD, ""); + fvVector.push_back(state); + setCacheEntryState(fvVector, NEW); + appTableCacheMap[key] = fvVector; + } + + return; +} + +void AppRestartAssist::reconcile() +{ + /* + iterate throught the table + if the entry has "SAME" flag, do nothing + if has "STALE/DELETE" flag, delete it from appDB. + else if "NEW" flag, add it to appDB + else, throw (should not happen) + */ + SWSS_LOG_ENTER(); + for (auto iter = appTableCacheMap.begin(); iter != appTableCacheMap.end(); ++iter ) + { + string s = joinVectorString(iter->second); + auto state = getCacheEntryState(iter->second); + + if (state == SAME) + { + SWSS_LOG_INFO("%s SAME, key: %s, %s", + m_appTableName.c_str(), iter->first.c_str(), s.c_str()); + continue; + } + else if (state == STALE || state == DELETE) + { + SWSS_LOG_NOTICE("%s STALE/DELETE, key: %s, %s", + m_appTableName.c_str(), iter->first.c_str(), s.c_str()); + + //delete from appDB + m_psTable->del(iter->first); + } + else if (state == NEW) + { + SWSS_LOG_NOTICE("%s NEW, key: %s, %s", + m_appTableName.c_str(), iter->first.c_str(), s.c_str()); + + //add to appDB, exclude the state + iter->second.pop_back(); + m_psTable->set(iter->first, iter->second); + } + else + { + throw std::logic_error("cache entry state is invalid"); + } + } + // clear the map + appTableCacheMap.clear(); + WarmStart::setWarmStartState(m_appName, WarmStart::RECONCILED); + m_warmStartInProgress = false; + return; +} + +// start the timer, take Select class "s" to add the timer. +void AppRestartAssist::startReconcileTimer(Select &s) +{ + m_warmStartTimer.start(); + s.addSelectable(&m_warmStartTimer); +} + +// stop the timer, take Select class "s" to remove the timer. +void AppRestartAssist::stopReconcileTimer(Select &s) +{ + m_warmStartTimer.stop(); + s.removeSelectable(&m_warmStartTimer); +} + +// take Selectable class pointer "*s" to check if timer expired. +bool AppRestartAssist::checkReconcileTimer(Selectable *s) +{ + if(s == &m_warmStartTimer) { + SWSS_LOG_INFO("warmstart timer expired"); + return true; + } + return false; +} + diff --git a/warmrestart/warmRestartAssist.h b/warmrestart/warmRestartAssist.h new file mode 100644 index 000000000000..8867147897c5 --- /dev/null +++ b/warmrestart/warmRestartAssist.h @@ -0,0 +1,69 @@ +#ifndef __WARM_RESTART_ASSIST__ +#define __WARM_RESTART_ASSIST__ + +#include +#include +#include "dbconnector.h" +#include "table.h" +#include "producerstatetable.h" +#include "selectabletimer.h" +#include "select.h" + +namespace swss { + +/* + * This class is to support application table reconciliation + * For any application table which has entries with key -> vector + */ +class AppRestartAssist +{ +public: + AppRestartAssist(RedisPipeline *pipeline, + const std::string &appName, const std::string &dockerName, + ProducerStateTable *psTable, const uint32_t defaultWarmStartTimerValue = 0); + virtual ~AppRestartAssist(); + + enum cache_state_t + { + STALE = 0, + SAME = 1, + NEW = 2, + DELETE = 3 + }; + void startReconcileTimer(Select &s); + void stopReconcileTimer(Select &s); + bool checkReconcileTimer(Selectable *s); + void readTableToMap(void); + void insertToMap(std::string key, std::vector fvVector, bool delete_key); + void reconcile(void); + bool isWarmStartInProgress(void) + { + return m_warmStartInProgress; + } + +private: + typedef std::map cache_state_map; + static const cache_state_map cacheStateMap; + const std::string CACHE_STATE_FIELD = "cache-state"; + static const uint32_t DEFAULT_INTERNAL_TIMER_VALUE = 5; + typedef std::unordered_map> AppTableMap; + AppTableMap appTableCacheMap; + + Table m_appTable; + std::string m_dockerName; + std::string m_appName; + ProducerStateTable *m_psTable; + std::string m_appTableName; + + bool m_warmStartInProgress; + uint32_t m_reconcileTimer; + SelectableTimer m_warmStartTimer; + + std::string joinVectorString(const std::vector &fv); + void setCacheEntryState(std::vector &fvVector, cache_state_t state); + cache_state_t getCacheEntryState(const std::vector &fvVector); +}; + +} + +#endif diff --git a/warmrestart/warm_restart.cpp b/warmrestart/warm_restart.cpp index ebb5c8e89580..93b53a0283a0 100644 --- a/warmrestart/warm_restart.cpp +++ b/warmrestart/warm_restart.cpp @@ -111,7 +111,7 @@ uint32_t WarmStart::getWarmStartTimer(const std::string &app_name, warmStart.m_cfgWarmRestartTable->hget(docker_name, timer_name, timer_value_str); unsigned long int temp_value = strtoul(timer_value_str.c_str(), NULL, 0); - if (temp_value != 0 && temp_value != ULONG_MAX && temp_value <= MAXIMUN_WARMRESTART_TIMER_VALUE) + if (temp_value != 0 && temp_value != ULONG_MAX && temp_value <= MAXIMUM_WARMRESTART_TIMER_VALUE) { SWSS_LOG_NOTICE("Getting warmStartTimer for docker: %s, app: %s, value: %lu", docker_name.c_str(), app_name.c_str(), temp_value); diff --git a/warmrestart/warm_restart.h b/warmrestart/warm_restart.h index 234a3004cdef..06361c5e1f0c 100644 --- a/warmrestart/warm_restart.h +++ b/warmrestart/warm_restart.h @@ -5,7 +5,7 @@ #include "dbconnector.h" #include "table.h" -#define MAXIMUN_WARMRESTART_TIMER_VALUE 9999 +#define MAXIMUM_WARMRESTART_TIMER_VALUE 9999 namespace swss {