From dc695fb2b81940a3da37eace5505167e2f916641 Mon Sep 17 00:00:00 2001 From: zhenggen-xu Date: Wed, 29 Jan 2020 12:15:39 -0800 Subject: [PATCH] [orch] change Consumer class to support multiple values for the same key (#1184) Description The Consumer class is used by the orch objects to deal with the redis consumer tables' popped items. It has m_toSync map to save the tasks. During the operations (more items than the orch objects can handle), the tasks in the map is merged for optimization. However, since it is a map, we would only have one value for each key. This potentially eliminate the necessary actions from Redis, e,g, we have a DEL action and SET action coming out from Redis for some key, we would overwrite the DEL action today in the code and thus not able to delete some objects as we intended. The PR changed the m_toSync from map to multi-map to get multiple values for one key. In this design, we keep maximun two values per key, DEL or SET or DEL+SET. We need strictly keep the order of DEL and SET. It is possible to use map of vectors to fulfill this, we chose multi-map because: 1, It will have less/no changes to different orch classes to iterate the m_toSync 2, The order can be guaranteed. The order of the key-value pairs whose keys compare equivalent is the order of insertion and does not change. (since C++11). See https://en.cppreference.com/w/cpp/container/multimap The PR also refactors the consumer class so vlanmgr.cpp and routeorch.cpp will leverage the Consumer functions instead of operating on the members. It also refactors the UT code (aclorch_ut.cpp) so it removes the redundant code and uses the same code. Google UT tests were added for Consumer Class especially for different cases for addToSync() function. What I did Change the m_toSync in Consumer class to multimap so it could support both DEL and SET Reload Consumer addToSync() and refactor vlanmgr/route-orch and ut code to use it Add google ut for consumer class Why I did it See description. How I verified it Unit tests: Running main() from gtest_main.cc ``` [==========] Running 19 tests from 5 test cases. [----------] Global test environment set-up. [----------] 1 test from AclTest [ RUN ] AclTest.Create_L3_Acl_Table [ OK ] AclTest.Create_L3_Acl_Table (1 ms) [----------] 1 test from AclTest (1 ms total) [----------] 3 tests from AclOrchTest [ RUN ] AclOrchTest.ACL_Creation_and_Destorying [ OK ] AclOrchTest.ACL_Creation_and_Destorying (1000 ms) [ RUN ] AclOrchTest.L3Acl_Matches_Actions [ OK ] AclOrchTest.L3Acl_Matches_Actions (1001 ms) [ RUN ] AclOrchTest.L3V6Acl_Matches_Actions [ OK ] AclOrchTest.L3V6Acl_Matches_Actions (1000 ms) [----------] 3 tests from AclOrchTest (3003 ms total) [----------] 2 tests from PortsOrchTest [ RUN ] PortsOrchTest.PortReadinessColdBoot [ OK ] PortsOrchTest.PortReadinessColdBoot (21 ms) [ RUN ] PortsOrchTest.PortReadinessWarmBoot [ OK ] PortsOrchTest.PortReadinessWarmBoot (13 ms) [----------] 2 tests from PortsOrchTest (34 ms total) [----------] 4 tests from SaiSpy [ RUN ] SaiSpy.CURD [ OK ] SaiSpy.CURD (0 ms) [ RUN ] SaiSpy.Same_Function_Signature_In_Same_API_Table [ OK ] SaiSpy.Same_Function_Signature_In_Same_API_Table (0 ms) [ RUN ] SaiSpy.Same_Function_Signature_In_Different_API_Table [ OK ] SaiSpy.Same_Function_Signature_In_Different_API_Table (0 ms) [ RUN ] SaiSpy.create_switch_and_acl_table [ OK ] SaiSpy.create_switch_and_acl_table (0 ms) [----------] 4 tests from SaiSpy (0 ms total) [----------] 9 tests from ConsumerTest [ RUN ] ConsumerTest.ConsumerAddToSync_Set [ OK ] ConsumerTest.ConsumerAddToSync_Set (1 ms) [ RUN ] ConsumerTest.ConsumerAddToSync_Del [ OK ] ConsumerTest.ConsumerAddToSync_Del (0 ms) [ RUN ] ConsumerTest.ConsumerAddToSync_Set_Del [ OK ] ConsumerTest.ConsumerAddToSync_Set_Del (0 ms) [ RUN ] ConsumerTest.ConsumerAddToSync_Del_Set [ OK ] ConsumerTest.ConsumerAddToSync_Del_Set (130 ms) [ RUN ] ConsumerTest.ConsumerAddToSync_Set_Del_Set_Multi [ OK ] ConsumerTest.ConsumerAddToSync_Set_Del_Set_Multi (204 ms) [ RUN ] ConsumerTest.ConsumerAddToSync_Set_Del_Set_Multi_In_Q [ OK ] ConsumerTest.ConsumerAddToSync_Set_Del_Set_Multi_In_Q (4 ms) [ RUN ] ConsumerTest.ConsumerAddToSync_Del_Set_Setnew [ OK ] ConsumerTest.ConsumerAddToSync_Del_Set_Setnew (0 ms) [ RUN ] ConsumerTest.ConsumerAddToSync_Del_Set_Setnew1 [ OK ] ConsumerTest.ConsumerAddToSync_Del_Set_Setnew1 (0 ms) [ RUN ] ConsumerTest.ConsumerAddToSync_Ind_Set_Del [ OK ] ConsumerTest.ConsumerAddToSync_Ind_Set_Del (0 ms) [----------] 9 tests from ConsumerTest (340 ms total) [----------] Global test environment tear-down [==========] 19 tests from 5 test cases ran. (4344 ms total) [ PASSED ] 19 tests. ``` Signed-off-by: Zhenggen Xu --- cfgmgr/vlanmgr.cpp | 5 +- orchagent/orch.cpp | 82 ++++++-- orchagent/orch.h | 17 +- orchagent/routeorch.cpp | 4 +- tests/mock_tests/Makefile.am | 1 + tests/mock_tests/aclorch_ut.cpp | 61 +----- tests/mock_tests/consumer_ut.cpp | 328 +++++++++++++++++++++++++++++++ 7 files changed, 412 insertions(+), 86 deletions(-) create mode 100644 tests/mock_tests/consumer_ut.cpp diff --git a/cfgmgr/vlanmgr.cpp b/cfgmgr/vlanmgr.cpp index 888ced509a19..03b0fdc6d43e 100644 --- a/cfgmgr/vlanmgr.cpp +++ b/cfgmgr/vlanmgr.cpp @@ -447,8 +447,9 @@ void VlanMgr::processUntaggedVlanMembers(string vlan, const string &members) vector fvVector; FieldValueTuple t("tagging_mode", "untagged"); fvVector.push_back(t); - consumer.m_toSync[member_key] = make_tuple(member_key, SET_COMMAND, fvVector); - SWSS_LOG_DEBUG("%s", (dumpTuple(consumer, consumer.m_toSync[member_key])).c_str()); + KeyOpFieldsValuesTuple tuple = make_tuple(member_key, SET_COMMAND, fvVector); + consumer.addToSync(tuple); + SWSS_LOG_DEBUG("%s", (dumpTuple(consumer, tuple)).c_str()); } /* * There is pending task from consumer pipe, in this case just skip it. diff --git a/orchagent/orch.cpp b/orchagent/orch.cpp index a6dce7efbb07..6dcbc6c3d656 100644 --- a/orchagent/orch.cpp +++ b/orchagent/orch.cpp @@ -67,36 +67,66 @@ vector Orch::getSelectables() return selectables; } -size_t Consumer::addToSync(std::deque &entries) +void Consumer::addToSync(const KeyOpFieldsValuesTuple &entry) { SWSS_LOG_ENTER(); - /* Nothing popped */ - if (entries.empty()) + + string key = kfvKey(entry); + string op = kfvOp(entry); + + /* Record incoming tasks */ + if (gSwssRecord) { - return 0; + Orch::recordTuple(*this, entry); } - for (auto& entry: entries) + /* + * m_toSync is a multimap which will allow one key with multiple values, + * Also, the order of the key-value pairs whose keys compare equivalent + * is the order of insertion and does not change. (since C++11) + */ + + /* If a new task comes we directly put it into getConsumerTable().m_toSync map */ + if (m_toSync.find(key) == m_toSync.end()) { - string key = kfvKey(entry); - string op = kfvOp(entry); + m_toSync.emplace(key, entry); + } - /* Record incoming tasks */ - if (gSwssRecord) + /* if a DEL task comes, we overwrite the old key */ + else if (op == DEL_COMMAND) + { + m_toSync.erase(key); + m_toSync.emplace(key, entry); + } + else + { + /* + * Now we are trying to add the key-value with SET. + * We maintain maximun two values per key. + * In case there is one key-value, it should be DEL or SET + * In case there are two key-value pairs, it should be DEL then SET + * The code logic is following: + * We iterate the values with the key, we skip the value with DEL and then + * check if that was the only one (I,E, the iter pointer now points to end or next key), + * in such case, we insert the key-value with SET. + * If there was a SET already (I,E, the pointer still points to the same key), we combine the kfv. + */ + auto ret = m_toSync.equal_range(key); + auto iter = ret.first; + for (; iter != ret.second; ++iter) { - Orch::recordTuple(*this, entry); + auto old_op = kfvOp(iter->second); + if (old_op == SET_COMMAND) + break; } - - /* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */ - if (m_toSync.find(key) == m_toSync.end() || op == DEL_COMMAND) + if (iter == ret.second) { - m_toSync[key] = entry; + m_toSync.emplace(key, entry); } - /* If an old task is still there, we combine the old task with new task */ else { - KeyOpFieldsValuesTuple existing_data = m_toSync[key]; + KeyOpFieldsValuesTuple existing_data = iter->second; auto new_values = kfvFieldsValues(entry); auto existing_values = kfvFieldsValues(existing_data); @@ -118,9 +148,21 @@ size_t Consumer::addToSync(std::deque &entries) } existing_values.push_back(FieldValueTuple(field, value)); } - m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values); + iter->second = KeyOpFieldsValuesTuple(key, op, existing_values); } } + +} + +size_t Consumer::addToSync(const std::deque &entries) +{ + SWSS_LOG_ENTER(); + + for (auto& entry: entries) + { + addToSync(entry); + } + return entries.size(); } @@ -186,7 +228,7 @@ void Consumer::drain() m_orch->doTask(*this); } -string Consumer::dumpTuple(KeyOpFieldsValuesTuple &tuple) +string Consumer::dumpTuple(const KeyOpFieldsValuesTuple &tuple) { string s = getTableName() + getConsumerTable()->getTableNameSeparator() + kfvKey(tuple) + "|" + kfvOp(tuple); @@ -412,7 +454,7 @@ void Orch::logfileReopen() } } -void Orch::recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple) +void Orch::recordTuple(Consumer &consumer, const KeyOpFieldsValuesTuple &tuple) { string s = consumer.dumpTuple(tuple); @@ -426,7 +468,7 @@ void Orch::recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple) } } -string Orch::dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple) +string Orch::dumpTuple(Consumer &consumer, const KeyOpFieldsValuesTuple &tuple) { string s = consumer.dumpTuple(tuple); return s; diff --git a/orchagent/orch.h b/orchagent/orch.h index b00716e1747a..3e25e96afe6f 100644 --- a/orchagent/orch.h +++ b/orchagent/orch.h @@ -57,7 +57,11 @@ typedef std::pair object_map_pair; typedef std::map type_map; typedef std::pair type_map_pair; -typedef std::map SyncMap; + +// Use multimap to support multiple OpFieldsValues for the same key (e,g, DEL and SET) +// The order of the key-value pairs whose keys compare equivalent is the order of +// insertion and does not change. (since C++11) +typedef std::multimap SyncMap; typedef std::pair table_name_with_pri_t; @@ -132,7 +136,7 @@ class Consumer : public Executor { return getConsumerTable()->getDbId(); } - std::string dumpTuple(swss::KeyOpFieldsValuesTuple &tuple); + std::string dumpTuple(const swss::KeyOpFieldsValuesTuple &tuple); void dumpPendingTasks(std::vector &ts); size_t refillToSync(); @@ -144,9 +148,10 @@ class Consumer : public Executor { // TODO: hide? SyncMap m_toSync; -protected: + void addToSync(const swss::KeyOpFieldsValuesTuple &entry); + // Returns: the number of entries added to m_toSync - size_t addToSync(std::deque &entries); + size_t addToSync(const std::deque &entries); }; typedef std::map> ConsumerMap; @@ -194,14 +199,14 @@ class Orch virtual void doTask(swss::SelectableTimer &timer) { } /* TODO: refactor recording */ - static void recordTuple(Consumer &consumer, swss::KeyOpFieldsValuesTuple &tuple); + static void recordTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple); void dumpPendingTasks(std::vector &ts); protected: ConsumerMap m_consumerMap; static void logfileReopen(); - std::string dumpTuple(Consumer &consumer, swss::KeyOpFieldsValuesTuple &tuple); + std::string dumpTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple); ref_resolve_status resolveFieldRefValue(type_map&, const std::string&, swss::KeyOpFieldsValuesTuple&, sai_object_id_t&); bool parseIndexRange(const std::string &input, sai_uint32_t &range_low, sai_uint32_t &range_high); bool parseReference(type_map &type_maps, std::string &ref, std::string &table_name, std::string &object_name); diff --git a/orchagent/routeorch.cpp b/orchagent/routeorch.cpp index 8aabef4a9e55..27338ef35cec 100644 --- a/orchagent/routeorch.cpp +++ b/orchagent/routeorch.cpp @@ -134,7 +134,7 @@ std::string RouteOrch::getLinkLocalEui64Addr(void) uint8_t eui64_interface_id[EUI64_INTF_ID_LEN]; char ipv6_ll_addr[INET6_ADDRSTRLEN] = {0}; - + /* Link-local IPv6 address autogenerated by kernel with eui64 interface-id * derived from the MAC address of the host interface. */ @@ -406,7 +406,7 @@ void RouteOrch::doTask(Consumer& consumer) vector v; key = vrf + i.first.to_string(); auto x = KeyOpFieldsValuesTuple(key, DEL_COMMAND, v); - consumer.m_toSync[key] = x; + consumer.addToSync(x); } } m_resync = true; diff --git a/tests/mock_tests/Makefile.am b/tests/mock_tests/Makefile.am index 4c0b5582cecd..a7f89475a606 100644 --- a/tests/mock_tests/Makefile.am +++ b/tests/mock_tests/Makefile.am @@ -23,6 +23,7 @@ LDADD_GTEST = -L/usr/src/gtest tests_SOURCES = aclorch_ut.cpp \ portsorch_ut.cpp \ saispy_ut.cpp \ + consumer_ut.cpp \ ut_saihelper.cpp \ mock_orchagent_main.cpp \ mock_dbconnector.cpp \ diff --git a/tests/mock_tests/aclorch_ut.cpp b/tests/mock_tests/aclorch_ut.cpp index 66b0090d23a3..dc784c75ae84 100644 --- a/tests/mock_tests/aclorch_ut.cpp +++ b/tests/mock_tests/aclorch_ut.cpp @@ -23,54 +23,6 @@ namespace aclorch_test { using namespace std; - size_t consumerAddToSync(Consumer *consumer, const deque &entries) - { - /* Nothing popped */ - if (entries.empty()) - { - return 0; - } - - for (auto &entry : entries) - { - string key = kfvKey(entry); - string op = kfvOp(entry); - - /* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */ - if (consumer->m_toSync.find(key) == consumer->m_toSync.end() || op == DEL_COMMAND) - { - consumer->m_toSync[key] = entry; - } - /* If an old task is still there, we combine the old task with new task */ - else - { - KeyOpFieldsValuesTuple existing_data = consumer->m_toSync[key]; - - auto new_values = kfvFieldsValues(entry); - auto existing_values = kfvFieldsValues(existing_data); - - for (auto it : new_values) - { - string field = fvField(it); - string value = fvValue(it); - - auto iu = existing_values.begin(); - while (iu != existing_values.end()) - { - string ofield = fvField(*iu); - if (field == ofield) - iu = existing_values.erase(iu); - else - iu++; - } - existing_values.push_back(FieldValueTuple(field, value)); - } - consumer->m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values); - } - } - return entries.size(); - } - struct AclTestBase : public ::testing::Test { vector m_s32list_pool; @@ -199,8 +151,7 @@ namespace aclorch_test auto consumer = unique_ptr(new Consumer( new swss::ConsumerStateTable(config_db, CFG_ACL_TABLE_TABLE_NAME, 1, 1), m_aclOrch, CFG_ACL_TABLE_TABLE_NAME)); - consumerAddToSync(consumer.get(), entries); - + consumer->addToSync(entries); static_cast(m_aclOrch)->doTask(*consumer); } @@ -209,8 +160,7 @@ namespace aclorch_test auto consumer = unique_ptr(new Consumer( new swss::ConsumerStateTable(config_db, CFG_ACL_RULE_TABLE_NAME, 1, 1), m_aclOrch, CFG_ACL_RULE_TABLE_NAME)); - consumerAddToSync(consumer.get(), entries); - + consumer->addToSync(entries); static_cast(m_aclOrch)->doTask(*consumer); } @@ -381,8 +331,7 @@ namespace aclorch_test auto consumer = unique_ptr(new Consumer( new swss::ConsumerStateTable(m_app_db.get(), APP_PORT_TABLE_NAME, 1, 1), gPortsOrch, APP_PORT_TABLE_NAME)); - consumerAddToSync(consumer.get(), { { "PortInitDone", EMPTY_PREFIX, { { "", "" } } } }); - + consumer->addToSync({ { "PortInitDone", EMPTY_PREFIX, { { "", "" } } } }); static_cast(gPortsOrch)->doTask(*consumer.get()); } @@ -628,7 +577,7 @@ namespace aclorch_test // consistency validation with CRM bool validateResourceCountWithCrm(const AclOrch *aclOrch, CrmOrch *crmOrch) { - // Verify ACL Tables + // Verify ACL Tables auto const &resourceMap = Portal::CrmOrchInternal::getResourceMap(crmOrch); uint32_t crm_acl_table_cnt = 0; for (auto const &kv : resourceMap.at(CrmResourceType::CRM_ACL_TABLE).countersMap) @@ -642,7 +591,7 @@ namespace aclorch_test << ") and AclOrch " << Portal::AclOrchInternal::getAclTables(aclOrch).size(); return false; } - + // Verify ACL Rules // diff --git a/tests/mock_tests/consumer_ut.cpp b/tests/mock_tests/consumer_ut.cpp new file mode 100644 index 000000000000..0facd1526987 --- /dev/null +++ b/tests/mock_tests/consumer_ut.cpp @@ -0,0 +1,328 @@ +#include "ut_helper.h" +#include "mock_orchagent_main.h" +#include "mock_table.h" + +#include + +extern PortsOrch *gPortsOrch; + +namespace consumer_test +{ + using namespace std; + + struct ConsumerTest : public ::testing::Test + { + shared_ptr m_app_db; + shared_ptr m_config_db; + shared_ptr m_state_db; + + string key = "key"; + string f1 = "field1"; + string v1a = "value1_a"; + string v1b = "value1_b"; + string f2 = "field2"; + string v2a = "value2_a"; + string v2b = "value2_b"; + string f3 = "field3"; + string v3a = "value3_a"; + KeyOpFieldsValuesTuple exp_kofv; + + unique_ptr consumer; + deque kofv_q; + + ConsumerTest() + { + // FIXME: move out from constructor + m_app_db = make_shared( + APPL_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0); + m_config_db = make_shared( + CONFIG_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0); + m_state_db = make_shared( + STATE_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0); + consumer = unique_ptr(new Consumer( + new swss::ConsumerStateTable(m_config_db.get(), "CFG_TEST_TABLE", 1, 1), gPortsOrch, "CFG_TEST_TABLE")); + } + + virtual void SetUp() override + { + ::testing_db::reset(); + } + + virtual void TearDown() override + { + ::testing_db::reset(); + } + + void validate_syncmap(SyncMap &sync, uint16_t exp_sz, std::string exp_key, KeyOpFieldsValuesTuple exp_kofv) + { + // verify the content in syncMap + ASSERT_EQ(sync.size(), exp_sz); + auto it = sync.begin(); + while (it != sync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + + string itkey = kfvKey(t); + if (itkey == exp_key) { + ASSERT_EQ(t, exp_kofv); + it = sync.erase(it); + break; + } else { + it++; + } + } + ASSERT_EQ(sync.size(), exp_sz-1); + } + }; + + TEST_F(ConsumerTest, ConsumerAddToSync_Set) + { + + // Test case, one set_command + auto entry = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1a }, + { f2, v2a } } }); + + kofv_q.push_back(entry); + consumer->addToSync(kofv_q); + exp_kofv = entry; + validate_syncmap(consumer->m_toSync, 1, key, exp_kofv); + } + + TEST_F(ConsumerTest, ConsumerAddToSync_Del) + { + // Test case, one with del_command + auto entry = KeyOpFieldsValuesTuple( + { key, + DEL_COMMAND, + { { } } }); + + kofv_q.push_back(entry); + consumer->addToSync(kofv_q); + + exp_kofv = entry; + validate_syncmap(consumer->m_toSync, 1, key, exp_kofv); + + } + + TEST_F(ConsumerTest, ConsumerAddToSync_Set_Del) + { + // Test case, add SET then DEL + auto entrya = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1a }, + { f2, v2a } } }); + + auto entryb = KeyOpFieldsValuesTuple( + { key, + DEL_COMMAND, + { { } } }); + + kofv_q.push_back(entrya); + kofv_q.push_back(entryb); + consumer->addToSync(kofv_q); + + // expect only DEL + exp_kofv = entryb; + validate_syncmap(consumer->m_toSync, 1, key, exp_kofv); + } + + TEST_F(ConsumerTest, ConsumerAddToSync_Del_Set) + { + auto entrya = KeyOpFieldsValuesTuple( + { key, + DEL_COMMAND, + { { } } }); + + auto entryb = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1a }, + { f2, v2a } } }); + + // Test case, add DEL then SET, re-try 100 times, order should be kept + for (auto x = 0; x < 100; x++) + { + kofv_q.push_back(entrya); + kofv_q.push_back(entryb); + consumer->addToSync(kofv_q); + + // expect DEL then SET + exp_kofv = entrya; + validate_syncmap(consumer->m_toSync, 2, key, exp_kofv); + + exp_kofv = entryb; + validate_syncmap(consumer->m_toSync, 1, key, exp_kofv); + } + } + + TEST_F(ConsumerTest, ConsumerAddToSync_Set_Del_Set_Multi) + { + // Test5, add SET, DEL then SET, re-try 100 times , order should be kept + auto entrya = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1a }, + { f2, v2a } } }); + + auto entryb = KeyOpFieldsValuesTuple( + { key, + DEL_COMMAND, + { { } } }); + + auto entryc = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1a }, + { f2, v2a } } }); + + for (auto x = 0; x < 100; x++) + { + kofv_q.push_back(entrya); + kofv_q.push_back(entryb); + kofv_q.push_back(entryc); + consumer->addToSync(kofv_q); + + // expect DEL then SET + exp_kofv = entryb; + validate_syncmap(consumer->m_toSync, 2, key, exp_kofv); + + exp_kofv = entryc; + validate_syncmap(consumer->m_toSync, 1, key, exp_kofv); + } + } + + TEST_F(ConsumerTest, ConsumerAddToSync_Set_Del_Set_Multi_In_Q) + { + // Test5, add SET, DEL then SET, repeat 100 times in queue, final result and order should be kept + auto entrya = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1a }, + { f2, v2a } } }); + + auto entryb = KeyOpFieldsValuesTuple( + { key, + DEL_COMMAND, + { { } } }); + + auto entryc = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1a }, + { f2, v2a } } }); + + for (auto x = 0; x < 100; x++) + { + kofv_q.push_back(entrya); + kofv_q.push_back(entryb); + kofv_q.push_back(entryc); + } + consumer->addToSync(kofv_q); + + // expect DEL then SET + exp_kofv = entryb; + validate_syncmap(consumer->m_toSync, 2, key, exp_kofv); + + exp_kofv = entryc; + validate_syncmap(consumer->m_toSync, 1, key, exp_kofv); + } + + TEST_F(ConsumerTest, ConsumerAddToSync_Del_Set_Setnew) + { + // Test case, DEL, SET, then SET with different value + auto entrya = KeyOpFieldsValuesTuple( + { key, + DEL_COMMAND, + { { } } }); + + auto entryb = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1a }, + { f2, v2a } } }); + + auto entryc = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1b }, + { f2, v2b } } }); + + kofv_q.push_back(entrya); + kofv_q.push_back(entryb); + kofv_q.push_back(entryc); + consumer->addToSync(kofv_q); + + // expect DEL then SET with new values + exp_kofv = entrya; + validate_syncmap(consumer->m_toSync, 2, key, exp_kofv); + + exp_kofv = entryc; + validate_syncmap(consumer->m_toSync, 1, key, exp_kofv); + } + + TEST_F(ConsumerTest, ConsumerAddToSync_Del_Set_Setnew1) + { + // Test case, DEL, SET, then SET with new values and new fields + auto entrya = KeyOpFieldsValuesTuple( + { key, + DEL_COMMAND, + { { } } }); + + auto entryb = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1a }, + { f2, v2a } } }); + + auto entryc = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1b }, + { f3, v3a } } }); + + kofv_q.push_back(entrya); + kofv_q.push_back(entryb); + kofv_q.push_back(entryc); + consumer->addToSync(kofv_q); + + // expect DEL then SET with new values and new fields + exp_kofv = entrya; + validate_syncmap(consumer->m_toSync, 2, key, exp_kofv); + + exp_kofv = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f2, v2a }, + { f1, v1b }, + { f3, v3a } } }); + + validate_syncmap(consumer->m_toSync, 1, key, exp_kofv); + } + + TEST_F(ConsumerTest, ConsumerAddToSync_Ind_Set_Del) + { + // Test case, Add individuals by addToSync, SET then DEL + auto entrya = KeyOpFieldsValuesTuple( + { key, + SET_COMMAND, + { { f1, v1a }, + { f2, v2a } } }); + + auto entryb = KeyOpFieldsValuesTuple( + { key, + DEL_COMMAND, + { { } } }); + + consumer->addToSync(entrya); + consumer->addToSync(entryb); + + // expect only DEL + exp_kofv = entryb; + validate_syncmap(consumer->m_toSync, 1, key, exp_kofv); + + } +}