From e6741cafc5ccacaf182fc416abd88d6f3f36438e Mon Sep 17 00:00:00 2001 From: wh002 Date: Tue, 14 Mar 2023 17:08:12 +0800 Subject: [PATCH] parse ranger policies --- src/common/replica_envs.h | 1 + src/common/replication.codes.h | 1 + src/common/replication_common.cpp | 2 + src/meta/app_env_validator.cpp | 3 +- .../ranger/ranger_resource_policy_manager.cpp | 382 +++++++++++++++++- .../ranger/ranger_resource_policy_manager.h | 48 ++- src/utils/error_code.h | 4 + 7 files changed, 438 insertions(+), 3 deletions(-) diff --git a/src/common/replica_envs.h b/src/common/replica_envs.h index f4c0d58294..4db367a7fa 100644 --- a/src/common/replica_envs.h +++ b/src/common/replica_envs.h @@ -56,6 +56,7 @@ class replica_envs static const std::string MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION; static const std::string BUSINESS_INFO; static const std::string REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS; + static const std::string REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES; static const std::string READ_QPS_THROTTLING; static const std::string READ_SIZE_THROTTLING; static const std::string BACKUP_REQUEST_QPS_THROTTLING; diff --git a/src/common/replication.codes.h b/src/common/replication.codes.h index 73df124979..b89cdd49ad 100644 --- a/src/common/replication.codes.h +++ b/src/common/replication.codes.h @@ -131,6 +131,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_START_MANUAL_COMPACT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_GET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_SET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE(LPC_CM_GET_RANGER_POLICY, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL #define CURRENT_THREAD_POOL THREAD_POOL_META_STATE diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp index 4ab27a74af..63617797ac 100644 --- a/src/common/replication_common.cpp +++ b/src/common/replication_common.cpp @@ -384,6 +384,8 @@ const std::string replica_envs::ROCKSDB_BLOCK_CACHE_ENABLED("replica.rocksdb_blo const std::string replica_envs::BUSINESS_INFO("business.info"); const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS( "replica_access_controller.allowed_users"); +const std::string replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES( + "replica_access_controller.ranger_policies"); const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling"); const std::string replica_envs::READ_SIZE_THROTTLING("replica.read_throttling_by_size"); const std::string diff --git a/src/meta/app_env_validator.cpp b/src/meta/app_env_validator.cpp index f541ee271a..41f9c299ce 100644 --- a/src/meta/app_env_validator.cpp +++ b/src/meta/app_env_validator.cpp @@ -212,7 +212,8 @@ void app_env_validator::register_all_validators() {replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME, nullptr}, {replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL, nullptr}, {replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, nullptr}, - {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr}}; + {replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr}, + {replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr}}; } } // namespace replication diff --git a/src/runtime/ranger/ranger_resource_policy_manager.cpp b/src/runtime/ranger/ranger_resource_policy_manager.cpp index c9025ca516..fcd2f7a899 100644 --- a/src/runtime/ranger/ranger_resource_policy_manager.cpp +++ b/src/runtime/ranger/ranger_resource_policy_manager.cpp @@ -20,6 +20,17 @@ #include #include +// Disable class-memaccess warning to facilitate compilation with gcc>7 +// https://github.com/Tencent/rapidjson/issues/1700 +// #pragma GCC diagnostic push +// #if defined(__GNUC__) && __GNUC__ >= 8 +// #pragma GCC diagnostic ignored "-Wclass-memaccess" +// #endif +// #include +// #pragma GCC diagnostic pop + +#include "common/replication.codes.h" +#include "common/replica_envs.h" #include "meta/meta_options.h" #include "meta/meta_service.h" #include "ranger_resource_policy_manager.h" @@ -30,6 +41,16 @@ namespace dsn { namespace ranger { +DSN_DEFINE_uint32(security, + update_ranger_policy_interval_sec, + 5, + "The interval seconds of meta " + "server to pull the latest " + "access control policy from " + "Ranger service."); +DSN_DEFINE_string(ranger, ranger_service_url, "", "Apache Ranger service url."); +DSN_DEFINE_string(ranger, ranger_service_name, "", "use policy name."); + #define RETURN_ERR_IF_MISSING_MEMBER(obj, member) \ do { \ if (!obj.IsObject() || !obj.HasMember(member)) { \ @@ -83,9 +104,11 @@ const std::map kAccessTypeMaping({{"READ", access_type {"CONTROL", access_type::kControl}}); } // anonymous namespace +std::chrono::milliseconds load_ranger_policy_retry_delay_ms(10000); + ranger_resource_policy_manager::ranger_resource_policy_manager( dsn::replication::meta_service *meta_svc) - : _meta_svc(meta_svc) //, _local_policy_version(0) + : _meta_svc(meta_svc), _local_policy_version(0) { _ranger_policy_meta_root = dsn::replication::meta_options::concat_path_unix_style( _meta_svc->cluster_root(), "ranger_policy_meta_root"); @@ -171,5 +194,362 @@ void ranger_resource_policy_manager::parse_policies_from_json(const rapidjson::V policies.emplace_back(pi); } } + +dsn::error_code ranger_resource_policy_manager::update_policies_from_ranger_service() +{ + std::string ranger_policies; + ERR_LOG_AND_RETURN_NOT_OK(pull_policies_from_ranger_service(&ranger_policies), + "Pull Ranger policies failed."); + LOG_DEBUG("Pull Ranger policies success."); + + auto err_code = load_policies_from_json(ranger_policies); + if (err_code == dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE) { + LOG_DEBUG("Skip to update local policies."); + // for the newly created table, its app envs must be empty. This needs to be executed + // periodically to update the table's app envs, regardless of whether the Ranger policy is + // updated or not. + CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies to app envs failed."); + LOG_DEBUG("Sync policies to app envs succeeded."); + return dsn::ERR_OK; + } + ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed."); + + start_to_dump_and_sync_policies(); + + return dsn::ERR_OK; +} + +dsn::error_code ranger_resource_policy_manager::pull_policies_from_ranger_service( + std::string *ranger_policies) const +{ + std::string cmd = + fmt::format("curl {}/{}", FLAGS_ranger_service_url, FLAGS_ranger_service_name); + std::stringstream resp; + if (dsn::utils::pipe_execute(cmd.c_str(), resp) != 0) { + return dsn::ERR_SYNC_RANGER_POLICIES_FAILED; + } + + *ranger_policies = resp.str(); + return dsn::ERR_OK; +} + +dsn::error_code ranger_resource_policy_manager::load_policies_from_json(const std::string &data) +{ + // The Ranger policy pulled from Ranger service demo. + /* + { + "serviceName": "PEGASUS1", + "serviceId": 1069, + "policyVersion": 60, + "policyUpdateTime": 1673254471000, + "policies": [{ + "id": 5334, + "guid": "c7918f8c-921a-4f3d-b9d7-bce7009ee5f8", + "isEnabled": true, + "version": 13, + "service": "PEGASUS1", + "name": "all - database", + "policyType": 0, + "policyPriority": 0, + "description": "Policy for all - database", + "isAuditEnabled": true, + "resources": { + "database": { + "values": ["PEGASUS1"], + "isExcludes": false, + "isRecursive": true + } + }, + "policyItems": [{ + "accesses": [{ + "type": "create", + "isAllowed": true + }, { + "type": "drop", + "isAllowed": true + }, { + "type": "control", + "isAllowed": true + }, { + "type": "metadata", + "isAllowed": true + }, { + "type": "list", + "isAllowed": true + }], + "users": ["PEGASUS1"], + "groups": [], + "roles": [], + "conditions": [], + "delegateAdmin": true + }], + "denyPolicyItems": [], + "allowExceptions": [], + "denyExceptions": [], + "dataMaskPolicyItems": [], + "rowFilterPolicyItems": [], + "serviceType": "pegasus", + "options": {}, + "validitySchedules": [], + "policyLabels": [], + "zoneName": "", + "isDenyAllElse": false + }], + "auditMode": "audit-default", + "serviceConfig": {} + } + */ + rapidjson::Document doc; + doc.Parse(data.c_str()); + + // Check if it is needed to update policies. + RETURN_ERR_IF_MISSING_MEMBER(doc, "policyVersion"); + int remote_policy_version = doc["policyVersion"].GetInt(); + if (_local_policy_version == remote_policy_version) { + LOG_DEBUG("Ranger policy version: {}, no need to update.", _local_policy_version); + return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE; + } + + if (_local_policy_version > remote_policy_version) { + LOG_WARNING("Local Ranger policy version ({}) is larger than remote version ({}), please " + "check Ranger services ({}).", + _local_policy_version, + remote_policy_version, + FLAGS_ranger_service_name); + return dsn::ERR_RANGER_POLICIES_NO_NEED_UPDATE; + } + + if (_local_policy_version == 0) { + _local_policy_version = remote_policy_version; + } + + // Update policies. + _all_resource_policies.clear(); + + // TODO(wanghao): it's optional + // Provide a DATABASE default policy for legacy tables. + // ranger_resource_policy default_database_policy; + // ranger_resource_policy::create_default_database_policy(default_database_policy); + // _all_resource_policies[enum_to_string(resource_type::kDatabase)] = {default_database_policy}; + + RETURN_ERR_IF_MISSING_MEMBER(doc, "policies"); + const rapidjson::Value &policies = doc["policies"]; + RETURN_ERR_IF_NOT_ARRAY(policies); + for (const auto &policy : policies.GetArray()) { + RETURN_ERR_IF_MISSING_MEMBER(policy, "isEnabled"); + // 1. Check if the policy is enabled or not. + if (!policy["isEnabled"].IsBool() || !policy["isEnabled"].GetBool()) { + continue; + } + + // 2. Parse resource type. + RETURN_ERR_IF_MISSING_MEMBER(policy, "resources"); + std::map> values_of_resource_type; + for (const auto &resource : policy["resources"].GetObject()) { + RETURN_ERR_IF_MISSING_MEMBER(resource.value, "values"); + RETURN_ERR_IF_NOT_ARRAY((resource.value)["values"]); + std::unordered_set values; + for (const auto &v : (resource.value)["values"].GetArray()) { + values.insert(v.GetString()); + } + values_of_resource_type.emplace(std::make_pair(resource.name.GetString(), values)); + } + + // 3. Construct ACL policy. + ranger_resource_policy resource_policy; + CONTINUE_IF_MISSING_MEMBER(policy, "name"); + resource_policy.name = policy["name"].GetString(); + + resource_type rt = resource_type::kUnknown; + do { + // TODO(wanghao): refactor the following code + // parse Ranger policies json string into `values_of_resource_type`, distinguish + // resource types by `values_of_resource_type.size()` + if (values_of_resource_type.size() == 1) { + auto iter = values_of_resource_type.find("global"); + if (iter != values_of_resource_type.end()) { + rt = resource_type::kGlobal; + break; + } + iter = values_of_resource_type.find("database"); + if (iter != values_of_resource_type.end()) { + resource_policy.database_names = iter->second; + rt = resource_type::kDatabase; + break; + } + } else if (values_of_resource_type.size() == 2) { + auto iter1 = values_of_resource_type.find("database"); + auto iter2 = values_of_resource_type.find("table"); + if (iter1 != values_of_resource_type.end() && + iter2 != values_of_resource_type.end()) { + resource_policy.database_names = iter1->second; + resource_policy.table_names = iter2->second; + rt = resource_type::kDatabaseTable; + break; + } + } + return dsn::ERR_RANGER_PARSE_ACL; + } while (false); + + parse_policies_from_json(policy["policyItems"], resource_policy.policies.allow_policies); + parse_policies_from_json(policy["denyPolicyItems"], resource_policy.policies.deny_policies); + parse_policies_from_json(policy["allowExceptions"], + resource_policy.policies.allow_policies_exclude); + parse_policies_from_json(policy["denyExceptions"], + resource_policy.policies.deny_policies_exclude); + + // 4. Add the ACL policy. + auto ret = _all_resource_policies.emplace(enum_to_string(rt), + resource_policies({resource_policy})); + if (!ret.second) { + ret.first->second.emplace_back(resource_policy); + } + } + + return dsn::ERR_OK; +} + +void ranger_resource_policy_manager::start_to_dump_and_sync_policies() +{ + LOG_DEBUG("Start to create Ranger policy meta root on remote storage."); + dsn::task_ptr sync_task = dsn::tasking::create_task( + LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() { dump_and_sync_policies(); }); + _meta_svc->get_remote_storage()->create_node( + _ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this, sync_task](dsn::error_code err) { + if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) { + LOG_DEBUG("Create Ranger policy meta root succeed."); + sync_task->enqueue(); + return; + } + CHECK_EQ(err, dsn::ERR_TIMEOUT); + LOG_ERROR("Create Ranger policy meta root timeout, try it later."); + dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY, + &_tracker, + [this]() { start_to_dump_and_sync_policies(); }, + 0, + load_ranger_policy_retry_delay_ms); + }); +} + +void ranger_resource_policy_manager::dump_and_sync_policies() +{ + LOG_DEBUG("Start to sync Ranger policies to remote storage."); + + dump_policies_to_remote_storage(); + LOG_DEBUG("Dump Ranger policies to remote storage succeed."); + + update_cached_policies(); + LOG_DEBUG("Update using resources policies succeed."); + + CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies to app envs failed."); + LOG_DEBUG("Sync policies to app envs succeeded."); +} + +void ranger_resource_policy_manager::dump_policies_to_remote_storage() +{ + dsn::blob value = json::json_forwarder::encode(_all_resource_policies); + _meta_svc->get_remote_storage()->set_data( + _ranger_policy_meta_root, value, LPC_CM_GET_RANGER_POLICY, [this](dsn::error_code e) { + if (e == dsn::ERR_OK) { + LOG_DEBUG("Dump Ranger policies to remote storage succeed."); + return; + } + CHECK_EQ_MSG(e, dsn::ERR_TIMEOUT, "Dump Ranger policies to remote storage failed."); + LOG_ERROR("Dump Ranger policies to remote storage timeout, retry later."); + dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY, + &_tracker, + [this]() { dump_policies_to_remote_storage(); }, + 0, + load_ranger_policy_retry_delay_ms); + }); +} + +void ranger_resource_policy_manager::update_cached_policies() +{ + { + utils::auto_write_lock l(_global_policies_lock); + _global_policies_cache.swap(_all_resource_policies[enum_to_string(resource_type::kGlobal)]); + // TODO(wanghao): provide a query method + } + { + utils::auto_write_lock l(_database_policies_lock); + _database_policies_cache.swap( + _all_resource_policies[enum_to_string(resource_type::kDatabase)]); + // TODO(wanghao): provide a query method + } +} + +dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs() +{ + const auto &table_policies = + _all_resource_policies.find(enum_to_string(resource_type::kDatabaseTable)); + if (table_policies == _all_resource_policies.end()) { + LOG_INFO("DATABASE_TABLE level policy is empty, skip to sync app envs."); + return dsn::ERR_OK; + } + + dsn::replication::configuration_list_apps_response list_resp; + dsn::replication::configuration_list_apps_request list_req; + list_req.status = dsn::app_status::AS_AVAILABLE; + _meta_svc->get_server_state()->list_apps(list_req, list_resp); + ERR_LOG_AND_RETURN_NOT_OK(list_resp.err, "list_apps failed."); + for (const auto &app : list_resp.infos) { + std::string database_name = get_database_name_from_app_name(app.app_name); + std::string table_name; + if (database_name.empty()) { + database_name = "*"; + table_name = app.app_name; + } else { + table_name = app.app_name.substr(database_name.size()); + } + + auto req = dsn::make_unique(); + req->__set_app_name(app.app_name); + req->__set_keys( + {dsn::replication::replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES}); + bool has_match_policy = false; + for (const auto &policy : table_policies->second) { + if (policy.database_names.count(database_name) == 0) { + continue; + } + + // if table name does not conform to the naming rules(database_name.table_name), + // database is defined by "*" in ranger for acl matching + if (policy.table_names.count("*") != 0 || policy.table_names.count(table_name) != 0) { + has_match_policy = true; + req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_SET); + req->__set_values( + {json::json_forwarder::encode(policy.policies).to_string()}); + + dsn::replication::update_app_env_rpc rpc(std::move(req), LPC_CM_GET_RANGER_POLICY); + _meta_svc->get_server_state()->set_app_envs(rpc); + ERR_LOG_AND_RETURN_NOT_OK(rpc.response().err, "set_app_envs failed."); + break; + } + } + + // There is no matched policy, clear app Ranger policy + if (!has_match_policy) { + req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_DEL); + + dsn::replication::update_app_env_rpc rpc(std::move(req), LPC_CM_GET_RANGER_POLICY); + _meta_svc->get_server_state()->del_app_envs(rpc); + ERR_LOG_AND_RETURN_NOT_OK(rpc.response().err, "del_app_envs failed."); + } + } + + return dsn::ERR_OK; +} + +std::string get_database_name_from_app_name(const std::string &app_name) +{ + std::string prefix = utils::find_string_prefix(app_name, '.'); + if (prefix.empty() || prefix == app_name) { + return std::string(); + } + + return prefix; +} + } // namespace ranger } // namespace dsn diff --git a/src/runtime/ranger/ranger_resource_policy_manager.h b/src/runtime/ranger/ranger_resource_policy_manager.h index 5b584a2756..46ec11153e 100644 --- a/src/runtime/ranger/ranger_resource_policy_manager.h +++ b/src/runtime/ranger/ranger_resource_policy_manager.h @@ -25,15 +25,19 @@ #include "common/json_helper.h" #include "gtest/gtest_prod.h" +#include "meta/server_state.h" #include "ranger_resource_policy.h" #include "rapidjson/document.h" +#include "runtime/api_task.h" #include "runtime/ranger/access_type.h" #include "utils/enum_helper.h" +#include "utils/error_code.h" namespace dsn { namespace replication { class meta_service; +class server_state; } enum class resource_type @@ -73,12 +77,48 @@ class ranger_resource_policy_manager static void parse_policies_from_json(const rapidjson::Value &data, std::vector &policies); + // Update policies from Ranger service. + dsn::error_code update_policies_from_ranger_service(); + + // Pull policies in JSON format from Ranger service. + dsn::error_code pull_policies_from_ranger_service(std::string *ranger_policies) const; + + // Load policies from JSON formated string. + dsn::error_code load_policies_from_json(const std::string &data); + + // Create the path to save policies in remote storage, and update using resources policies. + void start_to_dump_and_sync_policies(); + + // Sync policies in use from Ranger service. + void dump_and_sync_policies(); + + // Dump policies to remote storage. + void dump_policies_to_remote_storage(); + + // Update the cached global/database resources policies. + void update_cached_policies(); + + // Sync policies to app_envs(REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES). + dsn::error_code sync_policies_to_app_envs(); + private: + dsn::task_tracker _tracker; + // The path where policies to be saved in remote storage. std::string _ranger_policy_meta_root; replication::meta_service *_meta_svc; + // The cache of the global resources policies, it's a subset of '_all_resource_policies'. + utils::rw_lock_nr _global_policies_lock; // [ + resource_policies _global_policies_cache; + // ] + + // The cache of the database resources policies, it's a subset of '_all_resource_policies'. + utils::rw_lock_nr _database_policies_lock; // [ + resource_policies _database_policies_cache; + // ] + // The access type of RPCs which access global level resources. access_type_of_rpc_code _ac_type_of_global_rpcs; @@ -86,7 +126,7 @@ class ranger_resource_policy_manager access_type_of_rpc_code _ac_type_of_database_rpcs; // The Ranger policy version to determine whether to update. - // int _local_policy_version; + int _local_policy_version; // All Ranger ACL policies. all_resource_policies _all_resource_policies; @@ -95,5 +135,11 @@ class ranger_resource_policy_manager FRIEND_TEST(ranger_resource_policy_manager_test, parse_policies_from_json_for_test); }; + +// Try to get the database name of 'app_name'. +// When using Ranger for ACL, the constraint table naming rule is +// "{database_name}.{table_name}", use "." to split database name and table name. +// Return an empty string if 'app_name' is not a valid Ranger rule table name. +std::string get_database_name_from_app_name(const std::string &app_name); } // namespace ranger } // namespace dsn diff --git a/src/utils/error_code.h b/src/utils/error_code.h index c9fb34437d..613a67efee 100644 --- a/src/utils/error_code.h +++ b/src/utils/error_code.h @@ -168,4 +168,8 @@ DEFINE_ERR_CODE(ERR_PARENT_PARTITION_MISUSED) DEFINE_ERR_CODE(ERR_CHILD_NOT_READY) DEFINE_ERR_CODE(ERR_DISK_INSUFFICIENT) DEFINE_ERR_CODE(ERR_RETRY_EXHAUSTED) + +DEFINE_ERR_CODE(ERR_SYNC_RANGER_POLICIES_FAILED) +DEFINE_ERR_CODE(ERR_RANGER_PARSE_ACL) +DEFINE_ERR_CODE(ERR_RANGER_POLICIES_NO_NEED_UPDATE) } // namespace dsn