diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index e65443cca2e5..f55fbeba993d 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -596,14 +596,16 @@ Status YBClient::Data::WaitForDeleteTableToFinish(YBClient* client, std::bind(&YBClient::Data::IsDeleteTableInProgress, this, client, deleted_table_id, _1, _2)); } -Status YBClient::Data::TruncateTable(YBClient* client, - const string& table_id, +Status YBClient::Data::TruncateTables(YBClient* client, + const vector& table_ids, const MonoTime& deadline, bool wait) { TruncateTableRequestPB req; TruncateTableResponsePB resp; - req.set_table_id(table_id); + for (const auto& table_id : table_ids) { + req.add_table_ids(table_id); + } RETURN_NOT_OK((SyncLeaderMasterRpc( deadline, client, req, &resp, nullptr /* num_attempts */, "TruncateTable", &MasterServiceProxy::TruncateTable))); @@ -613,10 +615,12 @@ Status YBClient::Data::TruncateTable(YBClient* client, // Spin until the table is fully truncated, if requested. if (wait) { - RETURN_NOT_OK(WaitForTruncateTableToFinish(client, table_id, deadline)); + for (const auto& table_id : table_ids) { + RETURN_NOT_OK(WaitForTruncateTableToFinish(client, table_id, deadline)); + } } - LOG(INFO) << "Truncated table " << table_id; + LOG(INFO) << "Truncated table(s) " << JoinStrings(table_ids, ","); return Status::OK(); } diff --git a/src/yb/client/client-internal.h b/src/yb/client/client-internal.h index 9b47628fa676..6be36d66b3fd 100644 --- a/src/yb/client/client-internal.h +++ b/src/yb/client/client-internal.h @@ -114,10 +114,10 @@ class YBClient::Data { const std::string& deleted_table_id, const MonoTime& deadline); - CHECKED_STATUS TruncateTable(YBClient* client, - const std::string& table_id, - const MonoTime& deadline, - bool wait = true); + CHECKED_STATUS TruncateTables(YBClient* client, + const std::vector& table_ids, + const MonoTime& deadline, + bool wait = true); CHECKED_STATUS IsTruncateTableInProgress(YBClient* client, const std::string& table_id, diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 02acd994dc09..879f42adb8a2 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -139,10 +139,8 @@ using std::string; using std::vector; using google::protobuf::RepeatedPtrField; -using namespace yb::size_literals; +using namespace yb::size_literals; // NOLINT. -DEFINE_int32(redis_password_caching_duration_ms, 5000, - "The duration for which we will cache the redis passwords. 0 to disable."); DEFINE_test_flag(int32, yb_num_total_tablets, 0, "The total number of tablets per table when a table is created."); DECLARE_int32(yb_num_shards_per_tserver); @@ -406,8 +404,12 @@ Status YBClient::IsCreateTableInProgress(const YBTableName& table_name, } Status YBClient::TruncateTable(const string& table_id, bool wait) { + return TruncateTables({table_id}, wait); +} + +Status YBClient::TruncateTables(const vector& table_ids, bool wait) { MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout(); - return data_->TruncateTable(this, table_id, deadline, wait); + return data_->TruncateTables(this, table_ids, deadline, wait); } Status YBClient::DeleteTable(const YBTableName& table_name, bool wait) { @@ -628,6 +630,16 @@ CHECKED_STATUS YBClient::SetRedisPasswords(const std::vector& passwords) return SetRedisConfig(kRequirePass, passwords); } +CHECKED_STATUS YBClient::GetRedisPasswords(vector* passwords) { + Status s = GetRedisConfig(kRequirePass, passwords); + if (s.IsNotFound()) { + // If the redis config has no kRequirePass key. + passwords->clear(); + s = Status::OK(); + } + return s; +} + CHECKED_STATUS YBClient::SetRedisConfig(const string& key, const vector& values) { // Setting up request. RedisConfigSetRequestPB req; @@ -652,29 +664,6 @@ CHECKED_STATUS YBClient::GetRedisConfig(const string& key, vector* value return Status::OK(); } -CHECKED_STATUS YBClient::GetRedisPasswords(vector* passwords) { - MonoTime now = MonoTime::Now(); - { - std::lock_guard lock(redis_password_mutex_); - if (redis_cached_password_validity_expiry_.Initialized() && - now < redis_cached_password_validity_expiry_) { - *passwords = redis_cached_passwords_; - return Status::OK(); - } - - redis_cached_password_validity_expiry_ = - now + MonoDelta::FromMilliseconds(FLAGS_redis_password_caching_duration_ms); - Status s = GetRedisConfig(kRequirePass, &redis_cached_passwords_); - if (s.IsNotFound()) { - // If the redis config has no kRequirePass key. - redis_cached_passwords_.clear(); - s = Status::OK(); - } - *passwords = redis_cached_passwords_; - return s; - } -} - CHECKED_STATUS YBClient::GrantRole(const std::string& granted_role_name, const std::string& recipient_role_name) { // Setting up request. diff --git a/src/yb/client/client.h b/src/yb/client/client.h index d625a2f005c0..c08c2ec709cb 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -273,6 +273,7 @@ class YBClient : public std::enable_shared_from_this { // Truncate the specified table. // Set 'wait' to true if the call must wait for the table to be fully truncated before returning. CHECKED_STATUS TruncateTable(const std::string& table_id, bool wait = true); + CHECKED_STATUS TruncateTables(const std::vector& table_ids, bool wait = true); // Delete the specified table. // Set 'wait' to true if the call must wait for the table to be fully deleted before returning. @@ -544,10 +545,6 @@ class YBClient : public std::enable_shared_from_this { // same client to the same data. const std::string client_id_; - std::mutex redis_password_mutex_; - MonoTime redis_cached_password_validity_expiry_; - vector redis_cached_passwords_; - DISALLOW_COPY_AND_ASSIGN(YBClient); }; diff --git a/src/yb/client/yb_table_name.h b/src/yb/client/yb_table_name.h index e98cd1b99a4b..557a9daeef72 100644 --- a/src/yb/client/yb_table_name.h +++ b/src/yb/client/yb_table_name.h @@ -101,8 +101,9 @@ class YBTableName { } bool is_redis_table() const { - return ((has_namespace() && resolved_namespace_name() == common::kRedisKeyspaceName) && - table_name_ == common::kRedisTableName); + return ( + (has_namespace() && resolved_namespace_name() == common::kRedisKeyspaceName) && + table_name_.find(common::kRedisTableName) == 0); } std::string ToString() const { diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 78cd03c180c2..9acc12ec9e1a 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -2099,27 +2099,30 @@ Status CatalogManager::TruncateTable(const TruncateTableRequestPB* req, RETURN_NOT_OK(CheckOnline()); - // Lookup the table and verify if it exists. - TRACE("Looking up table"); - scoped_refptr table = FindPtrOrNull(table_ids_map_, req->table_id()); - if (table == nullptr) { - Status s = STATUS(NotFound, "The table does not exist"); - return SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s); - } + for (int i = 0; i < req->table_ids_size(); i++) { + const auto& table_id = req->table_ids(i); + // Lookup the table and verify if it exists. + TRACE("Looking up table"); + scoped_refptr table = FindPtrOrNull(table_ids_map_, table_id); + if (table == nullptr) { + Status s = STATUS(NotFound, Substitute("The table for table_id $0 does not exist", table_id)); + return SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s); + } - TRACE("Locking table"); - auto l = table->LockForRead(); - if (l->data().started_deleting() || l->data().is_deleted()) { - Status s = STATUS(NotFound, "The table does not exist"); - return SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s); - } + TRACE("Locking table"); + auto l = table->LockForRead(); + if (l->data().started_deleting() || l->data().is_deleted()) { + Status s = STATUS(NotFound, Substitute("The table $0 does not exist", table->ToString())); + return SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s); + } - // Send a Truncate() request to each tablet in the table. - SendTruncateTableRequest(table); + // Send a Truncate() request to each tablet in the table. + SendTruncateTableRequest(table); - LOG(INFO) << "Successfully initiated TRUNCATE for " << table->ToString() - << " per request from " << RequestorString(rpc); - background_tasks_->Wake(); + LOG(INFO) << "Successfully initiated TRUNCATE for " << table->ToString() << " per request from " + << RequestorString(rpc); + background_tasks_->Wake(); + } return Status::OK(); } diff --git a/src/yb/master/master.proto b/src/yb/master/master.proto index 733cf7ac9961..a7aad55b8d1f 100644 --- a/src/yb/master/master.proto +++ b/src/yb/master/master.proto @@ -632,7 +632,7 @@ message IsCreateTableDoneResponsePB { } message TruncateTableRequestPB { - optional bytes table_id = 1; + repeated bytes table_ids = 1; } message TruncateTableResponsePB { diff --git a/src/yb/yql/redis/redisserver/redis_commands.cc b/src/yb/yql/redis/redisserver/redis_commands.cc index a72c96d92248..d5bb826fa681 100644 --- a/src/yb/yql/redis/redisserver/redis_commands.cc +++ b/src/yb/yql/redis/redisserver/redis_commands.cc @@ -110,6 +110,10 @@ namespace redisserver { ((config, Config, -1, LOCAL)) \ ((info, Info, -1, LOCAL)) \ ((role, Role, 1, LOCAL)) \ + ((select, Select, 2, LOCAL)) \ + ((createdb, CreateDB, 2, LOCAL)) \ + ((listdb, ListDB, 1, LOCAL)) \ + ((deletedb, DeleteDB, 2, LOCAL)) \ ((ping, Ping, -1, LOCAL)) \ ((command, Command, -1, LOCAL)) \ ((monitor, Monitor, 1, LOCAL)) \ @@ -247,11 +251,20 @@ class LocalCommandData { BatchContextPtr context_; }; +string GetYBTableNameForRedisDatabase(const string& db_name) { + if (db_name == "0") { + return common::kRedisTableName; + } else { + return StrCat(common::kRedisTableName, "_", db_name); + } +} void GetTabletLocations(LocalCommandData data, RedisArrayPB* array_response) { vector tablets, partitions; vector locations; - const YBTableName table_name(common::kRedisKeyspaceName, common::kRedisTableName); + const auto redis_table_name = + GetYBTableNameForRedisDatabase(data.call()->connection_context().redis_db_to_use()); + const YBTableName table_name(common::kRedisKeyspaceName, redis_table_name); auto s = data.client()->GetTablets(table_name, 0, &tablets, &partitions, &locations, true /* update tablets cache */); if (!s.ok()) { @@ -333,6 +346,10 @@ void HandleEcho(LocalCommandData data) { void HandleMonitor(LocalCommandData data) { data.Respond(); + + // Add to the appenders after the call has been handled (i.e. reponded with "OK"). + auto conn = data.call()->connection(); + data.context()->service_data()->AppendToMonitors(conn); } void HandleRole(LocalCommandData data) { @@ -429,11 +446,14 @@ void HandleConfig(LocalCommandData data) { void HandleAuth(LocalCommandData data) { vector passwords; - auto status = data.client()->GetRedisPasswords(&passwords); + auto status = data.context()->service_data()->GetRedisPasswords(&passwords); RedisResponsePB resp; if (!status.ok() || !AcceptPassword(passwords, data.arg(1).ToBuffer())) { resp.set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR); - resp.set_error_message(strings::Substitute("ERR: Bad Password. $0", status.ToString())); + auto error_message = + (status.ok() ? "ERR: Bad Password." + : strings::Substitute("ERR: Bad Password. $0", status.ToString())); + resp.set_error_message(error_message); } else { RedisConnectionContext& context = data.call()->connection_context(); context.set_authenticated(true); @@ -442,12 +462,12 @@ void HandleAuth(LocalCommandData data) { data.Respond(&resp); } -void HandleFlushDB(LocalCommandData data) { +void FlushDBs(LocalCommandData data, const vector ids) { RedisResponsePB resp; - const Status s = FLAGS_yedis_enable_flush ? - data.client()->TruncateTable(data.table()->id()) : - STATUS(InvalidArgument, "FLUSHDB and FLUSHALL are not enabled."); + const Status s = FLAGS_yedis_enable_flush + ? data.client()->TruncateTables(ids) + : STATUS(InvalidArgument, "FLUSHDB and FLUSHALL are not enabled."); if (s.ok()) { resp.set_code(RedisResponsePB_RedisStatusCode_OK); @@ -459,8 +479,139 @@ void HandleFlushDB(LocalCommandData data) { data.Respond(&resp); } +void HandleFlushDB(LocalCommandData data) { + FlushDBs(data, {data.table()->id()}); +} + void HandleFlushAll(LocalCommandData data) { - HandleFlushDB(data); + vector table_names; + const string prefix = common::kRedisTableName; + Status s = data.client()->ListTables(&table_names, prefix); + if (!s.ok()) { + RedisResponsePB resp; + const Slice message = s.message(); + resp.set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR); + resp.set_error_message(message.data(), message.size()); + data.Respond(&resp); + return; + } + // Gather table ids. + vector table_ids; + for (const auto& name : table_names) { + std::shared_ptr table; + s = data.client()->OpenTable(name, &table); + if (!s.ok()) { + RedisResponsePB resp; + const Slice message = s.message(); + resp.set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR); + resp.set_error_message(message.data(), message.size()); + data.Respond(&resp); + return; + } + table_ids.push_back(table->id()); + } + FlushDBs(data, table_ids); +} + +void HandleCreateDB(LocalCommandData data) { + RedisResponsePB resp; + // Figure out the redis table name that we should be using. + const string db_name = data.arg(1).ToBuffer(); + const string redis_table_name = GetYBTableNameForRedisDatabase(db_name); + YBTableName table_name(common::kRedisKeyspaceName, redis_table_name); + + gscoped_ptr table_creator(data.client()->NewTableCreator()); + Status s = table_creator->table_name(table_name) + .table_type(yb::client::YBTableType::REDIS_TABLE_TYPE) + .Create(); + if (s.ok()) { + resp.set_code(RedisResponsePB_RedisStatusCode_OK); + } else if (s.IsAlreadyPresent()) { + VLOG(1) << "Table '" << table_name.ToString() << "' already exists"; + resp.set_code(RedisResponsePB_RedisStatusCode_OK); + } else { + const Slice message = s.message(); + resp.set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR); + resp.set_error_message(message.data(), message.size()); + } + data.Respond(&resp); +} + +void HandleListDB(LocalCommandData data) { + RedisResponsePB resp; + // Figure out the redis table name that we should be using. + vector table_names; + const string prefix = common::kRedisTableName; + const size_t prefix_len = strlen(common::kRedisTableName); + Status s = data.client()->ListTables(&table_names, prefix); + if (!s.ok()) { + const Slice message = s.message(); + resp.set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR); + resp.set_error_message(message.data(), message.size()); + data.Respond(&resp); + return; + } + + auto array_response = resp.mutable_array_response(); + vector dbs; + for (const auto& ybname : table_names) { + if (!ybname.is_redis_table()) continue; + const auto& tablename = ybname.table_name(); + if (tablename == common::kRedisTableName) { + dbs.push_back("0"); + } else { + // Of the form _. + dbs.push_back(tablename.substr(prefix_len + 1)); + } + } + std::sort(dbs.begin(), dbs.end()); + for (const string& db : dbs) { + AddElements(redisserver::EncodeAsBulkString(db), array_response); + } + array_response->set_encoded(true); + resp.set_code(RedisResponsePB::OK); + data.Respond(&resp); +} + +void HandleDeleteDB(LocalCommandData data) { + RedisResponsePB resp; + // Figure out the redis table name that we should be using. + const string db_name = data.arg(1).ToBuffer(); + const string redis_table_name = GetYBTableNameForRedisDatabase(db_name); + YBTableName table_name(common::kRedisKeyspaceName, redis_table_name); + + Status s = data.client()->DeleteTable(table_name, /* wait */ true); + if (s.ok()) { + resp.set_code(RedisResponsePB_RedisStatusCode_OK); + } else if (s.IsNotFound()) { + VLOG(1) << "Table '" << table_name.ToString() << "' does not exist."; + resp.set_code(RedisResponsePB_RedisStatusCode_OK); + } else { + const Slice message = s.message(); + resp.set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR); + resp.set_error_message(message.data(), message.size()); + } + data.Respond(&resp); +} + +void HandleSelect(LocalCommandData data) { + RedisResponsePB resp; + const string db_name = data.arg(1).ToBuffer(); + RedisServiceData* sd = data.context()->service_data(); + auto s = sd->OpenYBTableForDB(db_name); + if (s.ok()) { + // Update RedisConnectionContext to use the specified table. + RedisConnectionContext& context = data.call()->connection_context(); + context.use_redis_db(db_name); + resp.set_code(RedisResponsePB_RedisStatusCode_OK); + } else { + const Slice message = s.message(); + VLOG(1) << " Could not open Redis Table for db " << db_name << " : " << message.ToString(); + resp.set_code(RedisResponsePB_RedisStatusCode_SERVER_ERROR); + resp.set_error_message(message.data(), message.size()); + data.call()->MarkForClose(); + } + data.Respond(&resp); } void HandleDebugSleep(LocalCommandData data) { diff --git a/src/yb/yql/redis/redisserver/redis_commands.h b/src/yb/yql/redis/redisserver/redis_commands.h index b49533432641..c5a9ae6d013d 100644 --- a/src/yb/yql/redis/redisserver/redis_commands.h +++ b/src/yb/yql/redis/redisserver/redis_commands.h @@ -30,6 +30,26 @@ namespace yb { namespace redisserver { +typedef boost::function StatusFunctor; + +class RedisConnectionContext; + +class RedisServiceData { + public: + // Used for Monitor. + virtual void AppendToMonitors(std::shared_ptr conn) = 0; + virtual void LogToMonitors( + const string& end, const string& db, const RedisClientCommand& cmd) = 0; + + // Used for Auth. + virtual CHECKED_STATUS GetRedisPasswords(vector* passwords) = 0; + + // Used for Select. + virtual CHECKED_STATUS OpenYBTableForDB(const string& db_name) = 0; + + virtual ~RedisServiceData() {} +}; + // Context for batch of Redis commands. class BatchContext : public RefCountedThreadSafe { public: @@ -38,6 +58,7 @@ class BatchContext : public RefCountedThreadSafe { virtual const std::shared_ptr& call() const = 0; virtual const std::shared_ptr& client() const = 0; virtual const RedisServer* server() = 0; + virtual RedisServiceData* service_data() = 0; virtual void Apply( size_t index, diff --git a/src/yb/yql/redis/redisserver/redis_rpc.h b/src/yb/yql/redis/redisserver/redis_rpc.h index f7553a2f89c4..442892fb6ffe 100644 --- a/src/yb/yql/redis/redisserver/redis_rpc.h +++ b/src/yb/yql/redis/redisserver/redis_rpc.h @@ -45,6 +45,13 @@ class RedisConnectionContext : public rpc::ConnectionContextWithQueue { authenticated_.store(flag, std::memory_order_release); } + std::string redis_db_to_use() const { + return redis_db_name_; + } + + void use_redis_db(const std::string& name) { + redis_db_name_ = name; + } static std::string Name() { return "Redis"; } @@ -69,6 +76,7 @@ class RedisConnectionContext : public rpc::ConnectionContextWithQueue { size_t commands_in_batch_ = 0; size_t end_of_batch_ = 0; std::atomic authenticated_{false}; + std::string redis_db_name_ = "0"; MemTrackerPtr call_mem_tracker_; }; diff --git a/src/yb/yql/redis/redisserver/redis_service.cc b/src/yb/yql/redis/redisserver/redis_service.cc index 2bdf575d2313..9dbd3fb7ccb1 100644 --- a/src/yb/yql/redis/redisserver/redis_service.cc +++ b/src/yb/yql/redis/redisserver/redis_service.cc @@ -107,6 +107,9 @@ DEFINE_int32(redis_max_value_size, 64_MB, DEFINE_int32(redis_callbacks_threadpool_size, 64, "The maximum size for the threadpool which handles callbacks from the ybclient layer"); +DEFINE_int32(redis_password_caching_duration_ms, 5000, + "The duration for which we will cache the redis passwords. 0 to disable."); + DEFINE_bool(redis_safe_batch, true, "Use safe batching with Redis service"); DEFINE_bool(enable_redis_auth, true, "Enable AUTH for the Redis service"); @@ -121,6 +124,7 @@ using yb::client::YBClientBuilder; using yb::client::YBSchema; using yb::client::YBSession; using yb::client::YBStatusCallback; +using yb::client::YBTable; using yb::client::YBTableName; using yb::rpc::ConnectionPtr; using yb::rpc::ConnectionWeakPtr; @@ -658,18 +662,65 @@ class TabletOperations { OperationType last_conflict_type_ = OperationType::kNone; }; +string GetYBTableNameForRedisDatabase(const string& db_name) { + if (db_name == "0") { + return common::kRedisTableName; + } else { + return StrCat(common::kRedisTableName, "_", db_name); + } +} + +struct RedisServiceImplData : public RedisServiceData { + RedisServiceImplData(RedisServer* server, string&& yb_tier_master_addresses); + + constexpr static int kRpcTimeoutSec = 5; + + void AppendToMonitors(ConnectionPtr conn) override; + void LogToMonitors(const string& end, const string& db, const RedisClientCommand& cmd) override; + CHECKED_STATUS OpenYBTableForDB(const string& db_name) override; + + std::shared_ptr GetYBTableCached(const string& db_name); + CHECKED_STATUS GetRedisPasswords(vector* passwords) override; + CHECKED_STATUS Initialize(); + + bool initialized() const { return yb_client_initialized_.load(std::memory_order_relaxed); } + + std::string yb_tier_master_addresses_; + + yb::rpc::RpcMethodMetrics metrics_error_; + InternalMetrics metrics_internal_; + + // Mutex that protects the creation of client_ and populating db_to_opened_table_. + rw_spinlock yb_mutex_; + std::atomic yb_client_initialized_; + std::shared_ptr client_; + SessionPool session_pool_; + std::unordered_map> db_to_opened_table_; + + std::vector monitoring_clients_; + scoped_refptr> num_clients_monitoring_; + rw_spinlock monitoring_clients_mutex_; + + std::mutex redis_password_mutex_; + MonoTime redis_cached_password_validity_expiry_; + vector redis_cached_passwords_; + + RedisServer* server_; + +}; + class BatchContextImpl : public BatchContext { public: BatchContextImpl( const std::shared_ptr& client, - const RedisServer* server, + RedisServiceImplData* impl_data, const std::shared_ptr& table, SessionPool* session_pool, const std::shared_ptr& call, const InternalMetrics& metrics_internal, const MemTrackerPtr& mem_tracker) : client_(client), - server_(server), + impl_data_(impl_data), table_(table), session_pool_(session_pool), call_(call), @@ -692,8 +743,12 @@ class BatchContextImpl : public BatchContext { return client_; } + RedisServiceImplData* service_data() override { + return impl_data_; + } + const RedisServer* server() override { - return server_; + return impl_data_->server_; } const std::shared_ptr& table() const override { @@ -782,7 +837,7 @@ class BatchContextImpl : public BatchContext { } std::shared_ptr client_; - const RedisServer* server_ = nullptr; + RedisServiceImplData* impl_data_ = nullptr; std::shared_ptr table_; SessionPool* session_pool_; std::shared_ptr call_; @@ -835,60 +890,67 @@ class RedisServiceImpl::Impl { return true; } - vector GetRedisPasswords() { - vector ret; - CHECK_OK(client_->GetRedisPasswords(&ret)); - return ret; - } - bool CheckAuthentication(RedisConnectionContext* conn_context) { if (!conn_context->is_authenticated()) { - conn_context->set_authenticated(!FLAGS_enable_redis_auth || GetRedisPasswords().empty()); + vector passwords; + Status s = data_.GetRedisPasswords(&passwords); + conn_context->set_authenticated(!FLAGS_enable_redis_auth || (s.ok() && passwords.empty())); } return conn_context->is_authenticated(); } - constexpr static int kRpcTimeoutSec = 5; - void PopulateHandlers(); - void AppendToMonitors(ConnectionPtr conn); - void LogToMonitors(const string& end, int db, const RedisClientCommand& cmd); // Fetches the appropriate handler for the command, nullptr if none exists. const RedisCommandInfo* FetchHandler(const RedisClientCommand& cmd_args); - CHECKED_STATUS SetUpYBClient(); std::deque names_; std::unordered_map command_name_to_info_map_; - yb::rpc::RpcMethodMetrics metrics_error_; - InternalMetrics metrics_internal_; - std::string yb_tier_master_addresses_; - std::mutex yb_mutex_; // Mutex that protects the creation of client_ and table_. - std::atomic yb_client_initialized_; - std::shared_ptr client_; - SessionPool session_pool_; - std::shared_ptr table_; + RedisServiceImplData data_; +}; - std::vector monitoring_clients_; - scoped_refptr> num_clients_monitoring_; - rw_spinlock monitoring_clients_mutex_; +RedisServiceImplData::RedisServiceImplData(RedisServer* server, string&& yb_tier_master_addresses) + : yb_tier_master_addresses_(std::move(yb_tier_master_addresses)), + yb_client_initialized_(false), + server_(server) {} - RedisServer* server_; -}; +Status RedisServiceImplData::OpenYBTableForDB(const string& db_name) { + { + boost::shared_lock rguard(yb_mutex_); + if (db_to_opened_table_.find(db_name) != db_to_opened_table_.end()) { + return Status::OK(); + } + } + + std::shared_ptr table; + YBTableName table_name(common::kRedisKeyspaceName, GetYBTableNameForRedisDatabase(db_name)); + RETURN_NOT_OK(client_->OpenTable(table_name, &table)); + { + boost::lock_guard guard(yb_mutex_); + db_to_opened_table_[db_name] = table; + } + return Status::OK(); +} + +std::shared_ptr RedisServiceImplData::GetYBTableCached(const string& db_name) { + boost::shared_lock rguard(yb_mutex_); + return db_to_opened_table_[db_name]; +} -void RedisServiceImpl::Impl::AppendToMonitors(ConnectionPtr conn) { +void RedisServiceImplData::AppendToMonitors(ConnectionPtr conn) { boost::lock_guard lock(monitoring_clients_mutex_); monitoring_clients_.emplace_back(conn); num_clients_monitoring_->IncrementBy(1); } -void RedisServiceImpl::Impl::LogToMonitors( - const string& end, int db, const RedisClientCommand& cmd) { +void RedisServiceImplData::LogToMonitors( + const string& end, const string& db, const RedisClientCommand& cmd) { boost::shared_lock rlock(monitoring_clients_mutex_); if (monitoring_clients_.empty()) return; // Prepare the string to be sent to all the monitoring clients. + // TODO: Use timestamp that works with converter. int64_t now_ms = ToMicroseconds(CoarseMonoClock::Now().time_since_epoch()); std::stringstream ss; ss << "+"; @@ -928,50 +990,9 @@ void RedisServiceImpl::Impl::LogToMonitors( } } -void RedisServiceImpl::Impl::PopulateHandlers() { - auto metric_entity = server_->metric_entity(); - FillRedisCommands(metric_entity, std::bind(&Impl::SetupMethod, this, _1)); - - // Set up metrics for erroneous calls. - metrics_error_.handler_latency = YB_REDIS_METRIC(error).Instantiate(metric_entity); - metrics_internal_[static_cast(OperationType::kWrite)].handler_latency = - YB_REDIS_METRIC(set_internal).Instantiate(metric_entity); - metrics_internal_[static_cast(OperationType::kRead)].handler_latency = - YB_REDIS_METRIC(get_internal).Instantiate(metric_entity); - metrics_internal_[static_cast(OperationType::kLocal)].handler_latency = - metrics_internal_[static_cast(OperationType::kRead)].handler_latency; - - auto* proto = &METRIC_redis_monitoring_clients; - num_clients_monitoring_ = proto->Instantiate(metric_entity, 0); -} - -const RedisCommandInfo* RedisServiceImpl::Impl::FetchHandler(const RedisClientCommand& cmd_args) { - if (cmd_args.size() < 1) { - return nullptr; - } - Slice cmd_name = cmd_args[0]; - auto iter = command_name_to_info_map_.find(cmd_args[0]); - if (iter == command_name_to_info_map_.end()) { - YB_LOG_EVERY_N_SECS(ERROR, 60) - << "Command " << cmd_name << " not yet supported. " - << "Arguments: " << ToString(cmd_args) << ". " - << "Raw: " << Slice(cmd_args[0].data(), cmd_args.back().end()).ToDebugString(); - return nullptr; - } - return iter->second.get(); -} - -RedisServiceImpl::Impl::Impl(RedisServer* server, string yb_tier_master_addresses) - : yb_tier_master_addresses_(std::move(yb_tier_master_addresses)), - yb_client_initialized_(false), - server_(server) { - // TODO(ENG-446): Handle metrics for all the methods individually. - PopulateHandlers(); -} - -Status RedisServiceImpl::Impl::SetUpYBClient() { - std::lock_guard guard(yb_mutex_); - if (!yb_client_initialized_.load(std::memory_order_relaxed)) { +Status RedisServiceImplData::Initialize() { + boost::lock_guard guard(yb_mutex_); + if (!initialized()) { YBClientBuilder client_builder; client_builder.set_client_name("redis_ybclient"); client_builder.default_rpc_timeout(MonoDelta::FromSeconds(kRpcTimeoutSec)); @@ -1000,8 +1021,10 @@ Status RedisServiceImpl::Impl::SetUpYBClient() { server_->tserver()->permanent_uuid(), server_->tserver()->proxy()); } - const YBTableName table_name(common::kRedisKeyspaceName, common::kRedisTableName); - RETURN_NOT_OK(client_->OpenTable(table_name, &table_)); + std::shared_ptr table; + YBTableName table_name(common::kRedisKeyspaceName, GetYBTableNameForRedisDatabase("0")); + RETURN_NOT_OK(client_->OpenTable(table_name, &table)); + db_to_opened_table_["0"] = table; session_pool_.Init(client_, server_->metric_entity()); @@ -1010,6 +1033,61 @@ Status RedisServiceImpl::Impl::SetUpYBClient() { return Status::OK(); } +Status RedisServiceImplData::GetRedisPasswords(vector* passwords) { + MonoTime now = MonoTime::Now(); + + std::lock_guard lock(redis_password_mutex_); + if (redis_cached_password_validity_expiry_.Initialized() && + now < redis_cached_password_validity_expiry_) { + *passwords = redis_cached_passwords_; + return Status::OK(); + } + + RETURN_NOT_OK(client_->GetRedisPasswords(&redis_cached_passwords_)); + *passwords = redis_cached_passwords_; + redis_cached_password_validity_expiry_ = + now + MonoDelta::FromMilliseconds(FLAGS_redis_password_caching_duration_ms); + return Status::OK(); +} + +void RedisServiceImpl::Impl::PopulateHandlers() { + auto metric_entity = data_.server_->metric_entity(); + FillRedisCommands(metric_entity, std::bind(&Impl::SetupMethod, this, _1)); + + // Set up metrics for erroneous calls. + data_.metrics_error_.handler_latency = YB_REDIS_METRIC(error).Instantiate(metric_entity); + data_.metrics_internal_[static_cast(OperationType::kWrite)].handler_latency = + YB_REDIS_METRIC(set_internal).Instantiate(metric_entity); + data_.metrics_internal_[static_cast(OperationType::kRead)].handler_latency = + YB_REDIS_METRIC(get_internal).Instantiate(metric_entity); + data_.metrics_internal_[static_cast(OperationType::kLocal)].handler_latency = + data_.metrics_internal_[static_cast(OperationType::kRead)].handler_latency; + + auto* proto = &METRIC_redis_monitoring_clients; + data_.num_clients_monitoring_ = proto->Instantiate(metric_entity, 0); +} + +const RedisCommandInfo* RedisServiceImpl::Impl::FetchHandler(const RedisClientCommand& cmd_args) { + if (cmd_args.size() < 1) { + return nullptr; + } + Slice cmd_name = cmd_args[0]; + auto iter = command_name_to_info_map_.find(cmd_args[0]); + if (iter == command_name_to_info_map_.end()) { + YB_LOG_EVERY_N_SECS(ERROR, 60) + << "Command " << cmd_name << " not yet supported. " + << "Arguments: " << ToString(cmd_args) << ". " + << "Raw: " << Slice(cmd_args[0].data(), cmd_args.back().end()).ToDebugString(); + return nullptr; + } + return iter->second.get(); +} + +RedisServiceImpl::Impl::Impl(RedisServer* server, string yb_tier_master_addresses) + : data_(server, std::move(yb_tier_master_addresses)) { + PopulateHandlers(); +} + void RedisServiceImpl::Impl::Handle(rpc::InboundCallPtr call_ptr) { auto call = std::static_pointer_cast(call_ptr); @@ -1024,8 +1102,8 @@ void RedisServiceImpl::Impl::Handle(rpc::InboundCallPtr call_ptr) { } // Ensure that we have the required YBClient(s) initialized. - if (!yb_client_initialized_.load(std::memory_order_acquire)) { - auto status = SetUpYBClient(); + if (!data_.initialized()) { + auto status = data_.Initialize(); if (!status.ok()) { auto message = StrCat("Could not open .redis table. ", status.ToString()); for (size_t idx = 0; idx != call->client_batch().size(); ++idx) { @@ -1039,13 +1117,15 @@ void RedisServiceImpl::Impl::Handle(rpc::InboundCallPtr call_ptr) { // We process them as follows: // Each read commands are processed individually. // Sequential write commands use single session and the same batcher. - auto context = make_scoped_refptr( - client_, server_, table_, &session_pool_, call, metrics_internal_, - server_->mem_tracker()); const auto& batch = call->client_batch(); auto conn = call->connection(); const string remote = yb::ToString(conn->remote()); RedisConnectionContext* conn_context = &(call->connection_context()); + string db_name = conn_context->redis_db_to_use(); + auto yb_table = data_.GetYBTableCached(db_name); + auto context = make_scoped_refptr( + data_.client_, &data_, yb_table, &data_.session_pool_, call, data_.metrics_internal_, + data_.server_->mem_tracker()); for (size_t idx = 0; idx != batch.size(); ++idx) { const RedisClientCommand& c = batch[idx]; @@ -1077,15 +1157,22 @@ void RedisServiceImpl::Impl::Handle(rpc::InboundCallPtr call_ptr) { } else if (!CheckAuthentication(conn_context) && cmd_info->name != "auth") { RespondWithFailure(call, idx, "Authentication required.", "NOAUTH"); } else { + if (cmd_info->name != "config" && cmd_info->name != "monitor") { + data_.LogToMonitors(remote, db_name, c); + } + // Handle the call. cmd_info->functor(*cmd_info, idx, context.get()); - } - // Add to the appenders after the call has been handled (i.e. reponded with "OK"). - if (cmd_info->name == "monitor") { - AppendToMonitors(conn); - } else if (cmd_info->name != "config") { - LogToMonitors(remote, 0, c); + if (cmd_info->name == "select" && db_name != conn_context->redis_db_to_use()) { + // update context. + context->Commit(); + db_name = conn_context->redis_db_to_use(); + yb_table = data_.GetYBTableCached(db_name); + context = make_scoped_refptr( + data_.client_, &data_, yb_table, &data_.session_pool_, call, data_.metrics_internal_, + data_.server_->mem_tracker()); + } } } context->Commit(); diff --git a/src/yb/yql/redis/redisserver/redisserver-test.cc b/src/yb/yql/redis/redisserver/redisserver-test.cc index 648eb54187c0..949fbcaf1afc 100644 --- a/src/yb/yql/redis/redisserver/redisserver-test.cc +++ b/src/yb/yql/redis/redisserver/redisserver-test.cc @@ -594,9 +594,9 @@ void TestRedisService::DoRedisTest(int line, const Callback& callback) { expected_callbacks_called_++; VLOG(4) << "Testing with line: " << __FILE__ << ":" << line; - client().Send(command, [this, line, reply_type, callback] (const RedisReply& reply) { - VLOG(4) << "Received response for line: " << __FILE__ << ":" << line - << " : " << reply.as_string() << ", of type: " << to_underlying(reply.get_type()); + client().Send(command, [this, line, reply_type, callback](const RedisReply& reply) { + VLOG(4) << "Received response for line: " << __FILE__ << ":" << line << " : " + << reply.as_string() << ", of type: " << to_underlying(reply.get_type()); num_callbacks_called_++; ASSERT_EQ(reply_type, reply.get_type()) << "Originator: " << __FILE__ << ":" << line << ", reply: " << reply.ToString(); @@ -1243,6 +1243,154 @@ void ConnectWithPassword( test->UseClient(nullptr); } +TEST_F(TestRedisService, TestSelect) { + shared_ptr rc1 = std::make_shared("127.0.0.1", server_port()); + shared_ptr rc2 = std::make_shared("127.0.0.1", server_port()); + shared_ptr rc3 = std::make_shared("127.0.0.1", server_port()); + + const string default_db("0"); + const string second_db("2"); + + UseClient(rc1); + DoRedisTestOk(__LINE__, {"SET", "key", "v1"}); + SyncClient(); + + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v1"); + SyncClient(); + + // Select without creating a db should fail. + DoRedisTestExpectError(__LINE__, {"SELECT", second_db.c_str()}); + SyncClient(); + + // The connection would be closed upon a bad Select. + DoRedisTestExpectError(__LINE__, {"PING"}); + SyncClient(); + + // Use a different client. + UseClient(rc2); + // Get the value from the default_db. + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v1"); + SyncClient(); + + // Create DB. + DoRedisTestOk(__LINE__, {"CREATEDB", second_db.c_str()}); + SyncClient(); + + // Select should now go through. + DoRedisTestOk(__LINE__, {"SELECT", second_db.c_str()}); + SyncClient(); + + // Get should be empty. + DoRedisTestNull(__LINE__, {"GET", "key"}); + SyncClient(); + // Set a diffferent value + DoRedisTestOk(__LINE__, {"SET", "key", "v2"}); + SyncClient(); + // Get that value + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v2"); + SyncClient(); + // Select the original db and get the value. + DoRedisTestOk(__LINE__, {"SELECT", default_db.c_str()}); + SyncClient(); + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v1"); + SyncClient(); + + UseClient(rc3); + // By default we should get the value from db-0 + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v1"); + // Select second db. + DoRedisTestOk(__LINE__, {"SELECT", second_db.c_str()}); + // Get that value + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v2"); + SyncClient(); + + // List DB. + DoRedisTestArray(__LINE__, {"LISTDB"}, {default_db, second_db}); + SyncClient(); + + // Delete DB. + DoRedisTestOk(__LINE__, {"DeleteDB", second_db.c_str()}); + SyncClient(); + // Expect to not be able to read the value. + DoRedisTestExpectError(__LINE__, {"GET", "key"}); + SyncClient(); + // Expect to not be able to read the value. + DoRedisTestExpectError(__LINE__, {"SET", "key", "v2"}); + SyncClient(); + + // List DB. + DoRedisTestArray(__LINE__, {"LISTDB"}, {default_db}); + SyncClient(); + + rc1->Disconnect(); + rc2->Disconnect(); + rc3->Disconnect(); + + UseClient(nullptr); + VerifyCallbacks(); +} + +TEST_F(TestRedisService, TestTruncate) { + const string default_db("0"); + const string second_db("2"); + + DoRedisTestOk(__LINE__, {"SET", "key", "v1"}); + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v1"); + SyncClient(); + + // Create DB. + DoRedisTestOk(__LINE__, {"CREATEDB", second_db.c_str()}); + // Select should now go through. + DoRedisTestOk(__LINE__, {"SELECT", second_db.c_str()}); + SyncClient(); + + // Set a diffferent value + DoRedisTestOk(__LINE__, {"SET", "key", "v2"}); + // Get that value + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v2"); + SyncClient(); + + // Select the original db and get the value. + DoRedisTestOk(__LINE__, {"SELECT", default_db.c_str()}); + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v1"); + SyncClient(); + + // Flush the default_db + DoRedisTestOk(__LINE__, {"FLUSHDB"}); + + // Get should be empty. + DoRedisTestOk(__LINE__, {"SELECT", default_db.c_str()}); + DoRedisTestNull(__LINE__, {"GET", "key"}); + SyncClient(); + DoRedisTestOk(__LINE__, {"SELECT", second_db.c_str()}); + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v2"); + SyncClient(); + + DoRedisTestOk(__LINE__, {"SELECT", default_db.c_str()}); + DoRedisTestOk(__LINE__, {"SET", "key", "v1"}); + DoRedisTestBulkString(__LINE__, {"GET", "key"}, "v1"); + SyncClient(); + + // Flush the default_db + DoRedisTestOk(__LINE__, {"FLUSHALL"}); + + DoRedisTestNull(__LINE__, {"GET", "key"}); + SyncClient(); + + DoRedisTestOk(__LINE__, {"SELECT", default_db.c_str()}); + DoRedisTestNull(__LINE__, {"GET", "key"}); + SyncClient(); + DoRedisTestOk(__LINE__, {"SELECT", second_db.c_str()}); + DoRedisTestNull(__LINE__, {"GET", "key"}); + SyncClient(); + + // List DB. + DoRedisTestArray(__LINE__, {"LISTDB"}, {default_db, second_db}); + SyncClient(); + + VerifyCallbacks(); +} + TEST_F(TestRedisService, TestMonitor) { constexpr uint32 kDelayMs = NonTsanVsTsan(100, 1000); expected_no_sessions_ = true;