Skip to content

Commit

Permalink
Fix DBInterface blocking operations (#505)
Browse files Browse the repository at this point in the history
Fixes sonic-net/sonic-buildimage#8202
- Fix pubsub channel timeout unit
- Reset context err before next redisGetReply, otherwise later redisGetReply early return on no data, and burn CPU core in a loop.
  • Loading branch information
qiluo-msft committed Jul 28, 2021
1 parent 0e9385f commit bf8c832
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
3 changes: 2 additions & 1 deletion common/dbinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ bool DBInterface::_unavailable_data_handler(const std::string& dbName, const cha
auto& channel = keyspace_notification_channels.at(dbName);
auto ctx = channel->getContext();
redisReply *reply;
ctx->err = REDIS_OK; // Stop later redisGetReply early return on no data after first redisReply timeout
int rc = redisGetReply(ctx, reinterpret_cast<void**>(&reply));
if (rc == REDIS_ERR && ctx->err == REDIS_ERR_IO && errno == EAGAIN)
{
Expand Down Expand Up @@ -293,7 +294,7 @@ void DBInterface::_subscribe_keyspace_notification(const std::string& dbName)
pubsub->psubscribe(KEYSPACE_PATTERN);

// Set the timeout of the pubsub channel, so future redisGetReply will be impacted
struct timeval tv = { 0, (suseconds_t)(1000 * PUB_SUB_NOTIFICATION_TIMEOUT) };
struct timeval tv = { PUB_SUB_NOTIFICATION_TIMEOUT, 0 };
int rc = redisSetTimeout(pubsub->getContext(), tv);
if (rc != REDIS_OK)
{
Expand Down
30 changes: 15 additions & 15 deletions common/dbinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,6 @@ class DBInterface
DBConnector& get_redis_client(const std::string& dbName);
void set_redis_kwargs(std::string unix_socket_path, std::string host, int port);

private:
template <typename T, typename FUNC>
T blockable(FUNC f, const std::string& dbName, bool blocking = false);
// Unsubscribe the chosent client from keyspace event notifications
void _unsubscribe_keyspace_notification(const std::string& dbName);
bool _unavailable_data_handler(const std::string& dbName, const char *data);
// Subscribe the chosent client to keyspace event notifications
void _subscribe_keyspace_notification(const std::string& dbName);
// In the event Redis is unavailable, close existing connections, and try again.
void _connection_error_handler(const std::string& dbName);
void _onetime_connect(int dbId, const std::string& dbName);
// Keep reconnecting to Database 'dbId' until success
void _persistent_connect(int dbId, const std::string& dbName);

static const int BLOCKING_ATTEMPT_ERROR_THRESHOLD = 10;
static const int BLOCKING_ATTEMPT_SUPPRESSION = BLOCKING_ATTEMPT_ERROR_THRESHOLD + 5;

Expand All @@ -72,11 +58,25 @@ class DBInterface
static const int DATA_RETRIEVAL_WAIT_TIME = 3;

// Time to wait for any given message to arrive via pub-sub.
static constexpr double PUB_SUB_NOTIFICATION_TIMEOUT = 10.0; // seconds
static const int PUB_SUB_NOTIFICATION_TIMEOUT = 10; // seconds

// Maximum allowable time to wait on a specific pub-sub notification.
static constexpr double PUB_SUB_MAXIMUM_DATA_WAIT = 60.0; // seconds

private:
template <typename T, typename FUNC>
T blockable(FUNC f, const std::string& dbName, bool blocking = false);
// Unsubscribe the chosent client from keyspace event notifications
void _unsubscribe_keyspace_notification(const std::string& dbName);
bool _unavailable_data_handler(const std::string& dbName, const char *data);
// Subscribe the chosent client to keyspace event notifications
void _subscribe_keyspace_notification(const std::string& dbName);
// In the event Redis is unavailable, close existing connections, and try again.
void _connection_error_handler(const std::string& dbName);
void _onetime_connect(int dbId, const std::string& dbName);
// Keep reconnecting to Database 'dbId' until success
void _persistent_connect(int dbId, const std::string& dbName);

// Pub-sub keyspace pattern
static constexpr const char *KEYSPACE_PATTERN = "__key*__:*";

Expand Down
20 changes: 19 additions & 1 deletion tests/test_redis_ut.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,15 @@ def generator_SelectMemoryLeak():
assert not cases


def thread_coming_data():
print("Start thread: thread_coming_data")
db = SonicV2Connector(use_unix_socket_path=True)
db.connect("TEST_DB")
time.sleep(DBInterface.PUB_SUB_NOTIFICATION_TIMEOUT * 2)
db.set("TEST_DB", "key0_coming", "field1", "value2")
print("Leave thread: thread_coming_data")


def test_DBInterface():
dbintf = DBInterface()
dbintf.set_redis_kwargs("", "127.0.0.1", 6379)
Expand Down Expand Up @@ -254,13 +263,22 @@ def test_DBInterface():
with pytest.raises(TypeError):
fvs.update(fvs, fvs)

# Test blocking
# Test blocking reading existing data in Redis
fvs = db.get_all("TEST_DB", "key0", blocking=True)
assert "field1" in fvs
assert fvs["field1"] == "value2"
assert fvs.get("field1", "default") == "value2"
assert fvs.get("nonfield", "default") == "default"

# Test blocking reading coming data in Redis
thread = Thread(target=thread_coming_data)
thread.start()
fvs = db.get_all("TEST_DB", "key0_coming", blocking=True)
assert "field1" in fvs
assert fvs["field1"] == "value2"
assert fvs.get("field1", "default") == "value2"
assert fvs.get("nonfield", "default") == "default"

# Test hmset
fvs = {"field1": "value3", "field2": "value4"}
db.hmset("TEST_DB", "key5", fvs)
Expand Down

0 comments on commit bf8c832

Please sign in to comment.