Skip to content

Commit

Permalink
Allow DescribePartition request if UpdateRow permission is granted (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq committed Apr 25, 2024
1 parent b1eb036 commit 65f7d70
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 47 deletions.
4 changes: 2 additions & 2 deletions ydb/core/tx/scheme_board/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ namespace {
return entry.KeyDescription->SecurityObject;
}

static ui32 GetAccess(const TNavigate::TEntry&) {
return NACLib::EAccessRights::DescribeSchema;
static ui32 GetAccess(const TNavigate::TEntry& entry) {
return entry.Access;
}

static ui32 GetAccess(const TResolve::TEntry& entry) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/scheme_cache/scheme_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ struct TSchemeCacheNavigate {

// in
TVector<TString> Path;
ui32 Access = NACLib::DescribeSchema;
TTableId TableId;
ERequestType RequestType = ERequestType::ByPath;
EOp Operation = OpUnknown;
Expand Down
73 changes: 71 additions & 2 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "impl/trace_lazy.h"
#include "ut_utils/topic_sdk_test_setup.h"

#include <format>
#include <ydb/library/persqueue/topic_parser_public/topic_parser.h>
#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
Expand All @@ -10,8 +13,6 @@
#include <library/cpp/threading/future/future.h>
#include <library/cpp/threading/future/async.h>

#include <future>

namespace NYdb::NTopic::NTests {

Y_UNIT_TEST_SUITE(Describe) {
Expand Down Expand Up @@ -344,5 +345,73 @@ namespace NYdb::NTopic::NTests {
DescribeConsumer(setup, client, false, false, true, true);
DescribePartition(setup, client, false, false, true, true);
}

TDescribePartitionResult RunPermissionTest(TTopicSdkTestSetup& setup, int userId, bool existingTopic, bool allowUpdateRow, bool allowDescribeSchema) {
TString authToken = TStringBuilder() << "x-user-" << userId << "@builtin";
Cerr << std::format("=== existingTopic={} allowUpdateRow={} allowDescribeSchema={} authToken={}\n",
existingTopic, allowUpdateRow, allowDescribeSchema, std::string(authToken));

auto driverConfig = setup.MakeDriverConfig().SetAuthToken(authToken);
auto client = TTopicClient(TDriver(driverConfig));
auto settings = TDescribePartitionSettings().IncludeLocation(true);
i64 testPartitionId = 0;

NACLib::TDiffACL acl;
if (allowDescribeSchema) {
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::DescribeSchema, authToken);
}
if (allowUpdateRow) {
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, authToken);
}
setup.GetServer().AnnoyingClient->ModifyACL("/Root", TEST_TOPIC, acl.SerializeAsString());

return client.DescribePartition(existingTopic ? TEST_TOPIC : "bad-topic", testPartitionId, settings).GetValueSync();
}

Y_UNIT_TEST(DescribePartitionPermissions) {
TTopicSdkTestSetup setup(TEST_CASE_NAME);
setup.GetServer().EnableLogs({NKikimrServices::TX_PROXY_SCHEME_CACHE, NKikimrServices::SCHEME_BOARD_SUBSCRIBER}, NActors::NLog::PRI_TRACE);

int userId = 0;

struct Expectation {
bool existingTopic;
bool allowUpdateRow;
bool allowDescribeSchema;
EStatus status;
NYql::TIssueCode issueCode;
};

std::vector<Expectation> expectations{
{0, 0, 0, EStatus::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ACCESS_DENIED},
{0, 0, 1, EStatus::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ACCESS_DENIED},
{0, 1, 0, EStatus::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ACCESS_DENIED},
{0, 1, 1, EStatus::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ACCESS_DENIED},
{1, 0, 0, EStatus::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ACCESS_DENIED},
{1, 0, 1, EStatus::SUCCESS, 0},
{1, 1, 0, EStatus::SUCCESS, 0},
{1, 1, 1, EStatus::SUCCESS, 0},
};

for (auto [existing, update, describe, status, issue] : expectations) {
auto result = RunPermissionTest(setup, userId++, existing, update, describe);
auto resultStatus = result.GetStatus();
auto line = TStringBuilder() << "=== status=" << resultStatus;
NYql::TIssueCode resultIssue = 0;
if (!result.GetIssues().Empty()) {
resultIssue = result.GetIssues().begin()->GetCode();
line << " issueCode=" << resultIssue;
}
Cerr << (line << " issues=" << result.GetIssues().ToOneLineString() << Endl);

UNIT_ASSERT_EQUAL(resultStatus, status);
UNIT_ASSERT_EQUAL(resultIssue, issue);
if (resultStatus == EStatus::SUCCESS) {
auto& p = result.GetPartitionDescription().GetPartition();
UNIT_ASSERT(p.GetActive());
UNIT_ASSERT(p.GetPartitionLocation().Defined());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,4 @@ namespace NYdb::NTopic::NTests {
writeSession->Close();
}
}
}
}
57 changes: 26 additions & 31 deletions ydb/services/lib/actors/pq_schema_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,49 +128,44 @@ namespace NKikimr::NGRpcProxy::V1 {
return false;
};


void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate) {
auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
navigateRequest->DatabaseName = CanonizePath(Database);

NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = NKikimr::SplitPath(GetTopicPath());
entry.SyncVersion = true;
entry.ShowPrivatePath = showPrivate;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
navigateRequest->ResultSet.emplace_back(entry);

if (!SetRequestToken(navigateRequest.get())) {
AddIssue(FillIssue("Unauthenticated access is forbidden, please provide credentials",
Ydb::PersQueue::ErrorCode::ACCESS_DENIED));
return RespondWithCode(Ydb::StatusIds::UNAUTHORIZED);
}

navigateRequest->DatabaseName = CanonizePath(Database);
navigateRequest->ResultSet.emplace_back(NSchemeCache::TSchemeCacheNavigate::TEntry{
.Path = NKikimr::SplitPath(GetTopicPath()),
.Access = CheckAccessWithWriteTopicPermission ? NACLib::UpdateRow : NACLib::DescribeSchema,
.Operation = NSchemeCache::TSchemeCacheNavigate::OpList,
.ShowPrivatePath = showPrivate,
.SyncVersion = true,
});
if (!IsDead) {
ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigateRequest.release()));
}
}

bool ReplyIfNotTopic(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
Y_ABORT_UNLESS(result->ResultSet.size() == 1);
const auto& response = result->ResultSet.front();
const TString path = JoinPath(response.Path);

if (ev->Get()->Request.Get()->ResultSet.size() != 1 ||
ev->Get()->Request.Get()->ResultSet.begin()->Kind !=
NSchemeCache::TSchemeCacheNavigate::KindTopic) {
AddIssue(FillIssue(TStringBuilder() << "path '" << path << "' is not a topic",
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR));
RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
return true;
auto const& entries = ev->Get()->Request.Get()->ResultSet;
Y_ABORT_UNLESS(entries.size() == 1);
auto const& entry = entries.front();
if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindTopic) {
return false;
}
return false;
AddIssue(FillIssue(TStringBuilder() << "path '" << JoinPath(entry.Path) << "' is not a topic",
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR));
RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
return true;
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
Y_ABORT_UNLESS(result->ResultSet.size() == 1);
const auto& response = result->ResultSet.front();
auto const& entries = ev->Get()->Request.Get()->ResultSet;
Y_ABORT_UNLESS(entries.size() == 1);
const auto& response = entries.front();
const TString path = JoinPath(response.Path);

switch (response.Status) {
Expand Down Expand Up @@ -245,7 +240,6 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}


void StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
Expand All @@ -256,6 +250,7 @@ namespace NKikimr::NGRpcProxy::V1 {

protected:
bool IsDead = false;
bool CheckAccessWithWriteTopicPermission = false;
const TString TopicPath;
const TString Database;
};
Expand Down Expand Up @@ -339,13 +334,13 @@ namespace NKikimr::NGRpcProxy::V1 {
}

bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const override {
if (this->Request_->GetSerializedToken().empty()) {
return !(AppData()->PQConfig.GetRequireCredentialsInNewProtocol());
} else {
request->UserToken = new NACLib::TUserToken(this->Request_->GetSerializedToken());
if (auto const& token = this->Request_->GetSerializedToken()) {
request->UserToken = new NACLib::TUserToken(token);
return true;
}
return !(AppData()->PQConfig.GetRequireCredentialsInNewProtocol());
}

bool ProcessCdc(const NSchemeCache::TSchemeCacheNavigate::TEntry& response) override {
if constexpr (THasCdcStreamCompatibility<TDerived>::Value) {
if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) {
Expand Down
44 changes: 37 additions & 7 deletions ydb/services/persqueue_v1/actors/schema_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1319,30 +1319,60 @@ TDescribePartitionActor::TDescribePartitionActor(NKikimr::NGRpcService::IRequest
{
}

void TDescribePartitionActor::Bootstrap(const NActors::TActorContext& ctx)
{
void TDescribePartitionActor::Bootstrap(const NActors::TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "TDescribePartitionActor" << ctx.SelfID.ToString() << ": Bootstrap");
CheckAccessWithWriteTopicPermission = true;
TBase::Bootstrap(ctx);
SendDescribeProposeRequest(ctx);
Become(&TDescribePartitionActor::StateWork);
}

void TDescribePartitionActor::StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
case TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType:
if (NeedToRequestWithDescribeSchema(ev)) {
// We do not have the UpdateRow permission. Check if we're allowed to DescribeSchema.
CheckAccessWithWriteTopicPermission = false;
SendDescribeProposeRequest(ActorContext());
break;
}
[[fallthrough]];
default:
if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) {
TBase::StateWork(ev);
};
}
}

void TDescribePartitionActor::HandleCacheNavigateResponse(
TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev
) {
Y_ABORT_UNLESS(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic
// Return true if we need to send a second request to SchemeCache with DescribeSchema permission,
// because the first request checking the UpdateRow permission resulted in an AccessDenied error.
bool TDescribePartitionActor::NeedToRequestWithDescribeSchema(TAutoPtr<IEventHandle>& ev) {
if (!CheckAccessWithWriteTopicPermission) {
// We've already sent a request with DescribeSchema, ev is a response to it.
return false;
}

auto evNav = *reinterpret_cast<typename TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
auto const& entries = evNav->Get()->Request.Get()->ResultSet;
Y_ABORT_UNLESS(entries.size() == 1);

if (entries.front().Status != NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied) {
// We do have access to the requested entity or there was an error.
// Transfer ownership to the ev pointer, and let the base classes' StateWork methods handle the response.
ev = *reinterpret_cast<TAutoPtr<IEventHandle>*>(&evNav);
return false;
}

return true;
}

void TDescribePartitionActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
auto const& entries = ev->Get()->Request.Get()->ResultSet;
Y_ABORT_UNLESS(entries.size() == 1); // describe for only one topic
if (ReplyIfNotTopic(ev)) {
return;
}
PQGroupInfo = ev->Get()->Request->ResultSet[0].PQGroupInfo;
PQGroupInfo = entries[0].PQGroupInfo;
auto* partRes = Result.mutable_partition();
partRes->set_partition_id(Settings.Partitions[0]);
partRes->set_active(true);
Expand Down
12 changes: 8 additions & 4 deletions ydb/services/persqueue_v1/actors/schema_actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct TDescribeTopicActorSettings {
TVector<ui32> Partitions;
bool RequireStats = false;
bool RequireLocation = false;

TDescribeTopicActorSettings()
: Mode(EMode::DescribeTopic)
{}
Expand All @@ -85,7 +85,7 @@ struct TDescribeTopicActorSettings {
, RequireStats(requireStats)
, RequireLocation(requireLocation)
{}

static TDescribeTopicActorSettings DescribeTopic(bool requireStats, bool requireLocation, const TVector<ui32>& partitions = {}) {
TDescribeTopicActorSettings res{EMode::DescribeTopic, requireStats, requireLocation};
res.Partitions = partitions;
Expand All @@ -104,7 +104,7 @@ struct TDescribeTopicActorSettings {
res.Partitions = partitions;
return res;
}

static TDescribeTopicActorSettings DescribePartitionSettings(ui32 partition, bool stats, bool location) {
TDescribeTopicActorSettings res{EMode::DescribePartitions, stats, location};
res.Partitions = {partition};
Expand Down Expand Up @@ -261,9 +261,13 @@ using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo;
void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override;
void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override;
bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override;

virtual void Reply(const TActorContext& ctx) override;

private:

bool NeedToRequestWithDescribeSchema(TAutoPtr<IEventHandle>& ev);

private:
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
Ydb::Topic::DescribePartitionResult Result;
Expand Down

0 comments on commit 65f7d70

Please sign in to comment.