Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DBInterface blocking operations #505

Merged
merged 2 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion common/dbinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ bool DBInterface::_unavailable_data_handler(const std::string& dbName, const cha
auto& channel = keyspace_notification_channels.at(dbName);
auto ctx = channel->getContext();
redisReply *reply;
ctx->err = REDIS_OK; // Stop later redisGetReply early return on no data after first redisReply timeout
int rc = redisGetReply(ctx, reinterpret_cast<void**>(&reply));
if (rc == REDIS_ERR && ctx->err == REDIS_ERR_IO && errno == EAGAIN)
{
Expand Down Expand Up @@ -293,7 +294,7 @@ void DBInterface::_subscribe_keyspace_notification(const std::string& dbName)
pubsub->psubscribe(KEYSPACE_PATTERN);

// Set the timeout of the pubsub channel, so future redisGetReply will be impacted
struct timeval tv = { 0, (suseconds_t)(1000 * PUB_SUB_NOTIFICATION_TIMEOUT) };
struct timeval tv = { PUB_SUB_NOTIFICATION_TIMEOUT, 0 };
int rc = redisSetTimeout(pubsub->getContext(), tv);
if (rc != REDIS_OK)
{
Expand Down
30 changes: 15 additions & 15 deletions common/dbinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,6 @@ class DBInterface
DBConnector& get_redis_client(const std::string& dbName);
void set_redis_kwargs(std::string unix_socket_path, std::string host, int port);

private:
template <typename T, typename FUNC>
T blockable(FUNC f, const std::string& dbName, bool blocking = false);
// Unsubscribe the chosent client from keyspace event notifications
void _unsubscribe_keyspace_notification(const std::string& dbName);
bool _unavailable_data_handler(const std::string& dbName, const char *data);
// Subscribe the chosent client to keyspace event notifications
void _subscribe_keyspace_notification(const std::string& dbName);
// In the event Redis is unavailable, close existing connections, and try again.
void _connection_error_handler(const std::string& dbName);
void _onetime_connect(int dbId, const std::string& dbName);
// Keep reconnecting to Database 'dbId' until success
void _persistent_connect(int dbId, const std::string& dbName);

static const int BLOCKING_ATTEMPT_ERROR_THRESHOLD = 10;
static const int BLOCKING_ATTEMPT_SUPPRESSION = BLOCKING_ATTEMPT_ERROR_THRESHOLD + 5;

Expand All @@ -72,11 +58,25 @@ class DBInterface
static const int DATA_RETRIEVAL_WAIT_TIME = 3;

// Time to wait for any given message to arrive via pub-sub.
static constexpr double PUB_SUB_NOTIFICATION_TIMEOUT = 10.0; // seconds
static const int PUB_SUB_NOTIFICATION_TIMEOUT = 10; // seconds

// Maximum allowable time to wait on a specific pub-sub notification.
static constexpr double PUB_SUB_MAXIMUM_DATA_WAIT = 60.0; // seconds

private:
template <typename T, typename FUNC>
T blockable(FUNC f, const std::string& dbName, bool blocking = false);
// Unsubscribe the chosent client from keyspace event notifications
void _unsubscribe_keyspace_notification(const std::string& dbName);
bool _unavailable_data_handler(const std::string& dbName, const char *data);
// Subscribe the chosent client to keyspace event notifications
void _subscribe_keyspace_notification(const std::string& dbName);
// In the event Redis is unavailable, close existing connections, and try again.
void _connection_error_handler(const std::string& dbName);
void _onetime_connect(int dbId, const std::string& dbName);
// Keep reconnecting to Database 'dbId' until success
void _persistent_connect(int dbId, const std::string& dbName);

// Pub-sub keyspace pattern
static constexpr const char *KEYSPACE_PATTERN = "__key*__:*";

Expand Down
20 changes: 19 additions & 1 deletion tests/test_redis_ut.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ def generator_SelectMemoryLeak():
assert not cases


def thread_coming_data():
print("Start thread: thread_coming_data")
db = SonicV2Connector(use_unix_socket_path=True)
db.connect("TEST_DB")
time.sleep(DBInterface.PUB_SUB_NOTIFICATION_TIMEOUT * 2)
db.set("TEST_DB", "key0_coming", "field1", "value2")
print("Leave thread: thread_coming_data")


def test_DBInterface():
dbintf = DBInterface()
dbintf.set_redis_kwargs("", "127.0.0.1", 6379)
Expand Down Expand Up @@ -276,13 +285,22 @@ def test_DBInterface():
with pytest.raises(TypeError):
fvs.update(fvs, fvs)

# Test blocking
# Test blocking reading existing data in Redis
fvs = db.get_all("TEST_DB", "key0", blocking=True)
assert "field1" in fvs
assert fvs["field1"] == "value2"
assert fvs.get("field1", "default") == "value2"
assert fvs.get("nonfield", "default") == "default"

# Test blocking reading coming data in Redis
thread = Thread(target=thread_coming_data)
thread.start()
fvs = db.get_all("TEST_DB", "key0_coming", blocking=True)
assert "field1" in fvs
assert fvs["field1"] == "value2"
assert fvs.get("field1", "default") == "value2"
assert fvs.get("nonfield", "default") == "default"

# Test hmset
fvs = {"field1": "value3", "field2": "value4"}
db.hmset("TEST_DB", "key5", fvs)
Expand Down