Skip to content

Commit

Permalink
YQ-2068 ut for generic provider lookup actor (#4246)
Browse files Browse the repository at this point in the history
  • Loading branch information
zverevgeny authored May 27, 2024
1 parent b9a6345 commit b098b52
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 16 deletions.
16 changes: 16 additions & 0 deletions ydb/library/yql/providers/generic/actors/ut/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
UNITTEST_FOR(ydb/library/yql/providers/generic/actors)

PEERDIR(
ydb/library/yql/sql/pg_dummy
ydb/library/yql/providers/generic/connector/libcpp/ut_helpers
ydb/library/actors/testlib
library/cpp/testing/unittest
)

SRCS(
yql_generic_lookup_actor_ut.cpp
)

YQL_LAST_ABI_VERSION()

END()
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
#include <ydb/library/yql/minikql/mkql_alloc.h>
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>

#include <ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h>

#include <ydb/library/actors/testlib/test_runtime.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/test_creds.h>
#include <ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h>
#include <library/cpp/testing/unittest/registar.h>

#include <ydb/library/yql/utils/log/proto/logger_config.pb.h>
#include <ydb/library/yql/utils/log/log.h>

using namespace NYql::NConnector;
using namespace NYql::NConnector::NTest;
using namespace NYql;
using namespace NActors;

Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {

//Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
class TCallLookupActor: public TActorBootstrapped<TCallLookupActor> {
public:
TCallLookupActor(
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
NYql::NDq::IDqAsyncLookupSource* lookupSource,
NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp)
: Alloc(alloc)
, LookupSource(lookupSource)
, KeysToLookUp(std::move(keysToLookUp))
{
}

void Bootstrap() {
LookupSource->AsyncLookup(std::move(KeysToLookUp));
auto guard = Guard(*Alloc);
KeysToLookUp.clear();
KeysToLookUp.shrink_to_fit();
}

private:
static constexpr char ActorName[] = "TEST";

private:
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NYql::NDq::IDqAsyncLookupSource* LookupSource;
NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp;
};

Y_UNIT_TEST(Lookup) {
auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false);
NKikimr::NMiniKQL::TMemoryUsageInfo memUsage("TestMemUsage");
NKikimr::NMiniKQL::THolderFactory holderFactory(alloc->Ref(), memUsage);
NKikimr::NMiniKQL::TTypeEnvironment typeEnv(*alloc);
NKikimr::NMiniKQL::TTypeBuilder typeBuilder(typeEnv);

auto loggerConfig = NYql::NProto::TLoggingConfig();
loggerConfig.set_allcomponentslevel(::NYql::NProto::TLoggingConfig_ELevel::TLoggingConfig_ELevel_TRACE);
NYql::NLog::InitLogger(loggerConfig, false);

TTestActorRuntimeBase runtime;
runtime.Initialize();
auto edge = runtime.AllocateEdgeActor();

NYql::NConnector::NApi::TDataSourceInstance dsi;
dsi.Setkind(NYql::NConnector::NApi::EDataSourceKind::YDB);
dsi.mutable_endpoint()->Sethost("some_host");
dsi.mutable_endpoint()->Setport(2135);
dsi.Setdatabase("some_db");
dsi.Setuse_tls(true);
dsi.set_protocol(::NYql::NConnector::NApi::EProtocol::NATIVE);
auto token = dsi.mutable_credentials() -> mutable_token();
token->Settype("IAM");
token->Setvalue("TEST_TOKEN");

auto connectorMock = std::make_shared<NYql::NConnector::NTest::TConnectorClientMock>();

// clang-format off
// step 1: ListSplits
connectorMock->ExpectListSplits()
.Select()
.DataSourceInstance(dsi)
.What()
.Column("id", Ydb::Type::UINT64)
.NullableColumn("optional_id", Ydb::Type::UINT64)
.NullableColumn("string_value", Ydb::Type::STRING)
.Done()
.Table("lookup_test")
.Where()
.Filter()
.Disjunction()
.Operand()
.Conjunction()
.Operand().Equal().Column("id").Value<ui64>(0).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(100).Done().Done()
.Done()
.Done()
.Operand()
.Conjunction()
.Operand().Equal().Column("id").Value<ui64>(1).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(101).Done().Done()
.Done()
.Done()
.Operand()
.Conjunction()
.Operand().Equal().Column("id").Value<ui64>(2).Done().Done()
.Operand().Equal().Column("optional_id").OptionalValue<ui64>(102).Done().Done()
.Done()
.Done()
.Done()
.Done()
.Done()
.Done()
.MaxSplitCount(1)
.Result()
.AddResponse(NewSuccess())
.Description("Actual split info is not important")
;

connectorMock->ExpectReadSplits()
.DataSourceInstance(dsi)
.Split()
.Description("Actual split info is not important")
.Done()
.Result()
.AddResponse(
MakeRecordBatch(
MakeArray<arrow::UInt64Builder, ui64>("id", {0, 1, 2}, arrow::uint64()),
MakeArray<arrow::UInt64Builder, ui64>("optional_id", {100, 101, 103}, arrow::uint64()), //the last value is intentially wrong
MakeArray<arrow::StringBuilder, std::string>("string_value", {"a", "b", "c"}, arrow::utf8())
),
NewSuccess()
)
;
// clang-format on

NYql::Generic::TLookupSource lookupSourceSettings;
*lookupSourceSettings.mutable_data_source_instance() = dsi;
lookupSourceSettings.Settable("lookup_test");
lookupSourceSettings.SetServiceAccountId("testsaid");
lookupSourceSettings.SetServiceAccountIdSignature("fake_signature");

google::protobuf::Any packedLookupSource;
Y_ABORT_UNLESS(packedLookupSource.PackFrom(lookupSourceSettings));

NKikimr::NMiniKQL::TStructTypeBuilder keyTypeBuilder{typeEnv};
keyTypeBuilder.Add("id", typeBuilder.NewDataType(NUdf::EDataSlot::Uint64, false));
keyTypeBuilder.Add("optional_id", typeBuilder.NewDataType(NUdf::EDataSlot::Uint64, true));
NKikimr::NMiniKQL::TStructTypeBuilder outputypeBuilder{typeEnv};
outputypeBuilder.Add("string_value", typeBuilder.NewDataType(NUdf::EDataSlot::String, true));

auto guard = Guard(*alloc.get());

auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor(
connectorMock,
std::make_shared<NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
edge,
alloc,
std::move(lookupSourceSettings),
keyTypeBuilder.Build(),
outputypeBuilder.Build(),
typeEnv,
holderFactory,
1'000'000);
runtime.Register(actor);

NKikimr::NMiniKQL::TUnboxedValueVector keys;
for (size_t i = 0; i != 3; ++i) {
NUdf::TUnboxedValue* keyItems;
auto key = holderFactory.CreateDirectArrayHolder(2, keyItems);
keyItems[0] = NUdf::TUnboxedValuePod(ui64(i));
keyItems[1] = NUdf::TUnboxedValuePod(ui64(100 + i));
keys.push_back(std::move(key));
}

guard.Release(); //let actors use alloc

auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(keys));
runtime.Register(callLookupActor);

auto ev = runtime.GrabEdgeEventRethrow<NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
auto guard2 = Guard(*alloc.get());
NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move(ev->Get()->Data);

UNIT_ASSERT_EQUAL(3, lookupResult.size());
{
auto& [k, v] = lookupResult[0];
UNIT_ASSERT_EQUAL(0, k.GetElement(0).Get<ui64>());
UNIT_ASSERT_EQUAL(100, k.GetElement(1).Get<ui64>());
NUdf::TUnboxedValue val = v.GetElement(0);
UNIT_ASSERT(val.AsStringRef() == TStringBuf("a"));
}
{
auto& [k, v] = lookupResult[1];
UNIT_ASSERT_EQUAL(1, k.GetElement(0).Get<ui64>());
UNIT_ASSERT_EQUAL(101, k.GetElement(1).Get<ui64>());
NUdf::TUnboxedValue val = v.GetElement(0);
UNIT_ASSERT(val.AsStringRef() == TStringBuf("b"));
}
{
auto& [k, v] = lookupResult[2];
UNIT_ASSERT_EQUAL(2, k.GetElement(0).Get<ui64>());
UNIT_ASSERT_EQUAL(102, k.GetElement(1).Get<ui64>());
//this key was not found and reported as empty
UNIT_ASSERT(!v);
}
}

} //Y_UNIT_TEST_SUITE(GenericProviderLookupActor)
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/generic/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ PEERDIR(
YQL_LAST_ABI_VERSION()

END()

RECURSE_FOR_TESTS(ut)
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ namespace NYql::NDq {
return MaxKeysInRequest;
}
void AsyncLookup(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) override {
auto guard = Guard(*Alloc);
CreateRequest(keys);
}

Expand Down Expand Up @@ -148,9 +149,8 @@ namespace NYql::NDq {
Y_ABORT_UNLESS(response.splits_size() == 1);
auto& split = response.splits(0);
NConnector::NApi::TReadSplitsRequest readRequest;
*readRequest.mutable_data_source_instance() = LookupSource.data_source_instance();
*readRequest.mutable_data_source_instance() = GetDataSourceInstanceWithToken();
*readRequest.add_splits() = split;
readRequest.Setmode(NConnector::NApi::TReadSplitsRequest_EMode::TReadSplitsRequest_EMode_ORDERED);
readRequest.Setformat(NConnector::NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
Connector->ReadSplits(readRequest).Subscribe([actorSystem = TActivationContext::ActorSystem(), selfId = SelfId()](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector";
Expand Down Expand Up @@ -269,7 +269,6 @@ namespace NYql::NDq {
for (auto&& k : RequestedKeys) {
LookupResult.emplace_back(std::move(k), NUdf::TUnboxedValue{});
}
RequestedKeys.clear();
auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(LookupResult));
TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev));
LookupResult = {};
Expand Down Expand Up @@ -317,12 +316,17 @@ namespace NYql::NDq {
return result;
}

NConnector::NApi::TSelect CreateSelect(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) {
NConnector::NApi::TSelect select;
*select.mutable_data_source_instance() = LookupSource.data_source_instance();
NYql::NConnector::NApi::TDataSourceInstance GetDataSourceInstanceWithToken() const {
auto dsi = LookupSource.data_source_instance();
//Note: returned token may be stale and we have no way to check or recover here
//Consider to redesign ICredentialsProvider
TokenProvider->MaybeFillToken(*select.mutable_data_source_instance());
TokenProvider->MaybeFillToken(dsi);
return dsi;
}

NConnector::NApi::TSelect CreateSelect(const NKikimr::NMiniKQL::TUnboxedValueVector& keys) {
NConnector::NApi::TSelect select;
*select.mutable_data_source_instance() = GetDataSourceInstanceWithToken();

for (ui32 i = 0; i != SelectResultType->GetMembersCount(); ++i) {
auto c = select.mutable_what()->add_items()->mutable_column();
Expand Down Expand Up @@ -378,7 +382,7 @@ namespace NYql::NDq {
std::pair<NYql::NDq::IDqAsyncLookupSource*, NActors::IActor*> CreateGenericLookupActor(
NConnector::IClient::TPtr connectorClient,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NActors::TActorId&& parentId,
NActors::TActorId parentId,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
NYql::Generic::TLookupSource&& lookupSource,
const NKikimr::NMiniKQL::TStructType* keyType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace NYql::NDq {
CreateGenericLookupActor(
NConnector::IClient::TPtr connectorClient,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NActors::TActorId&& parentId,
NActors::TActorId parentId,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
NYql::Generic::TLookupSource&& lookupSource,
const NKikimr::NMiniKQL::TStructType* keyType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,28 @@ namespace NYql::NConnector::NTest {

using namespace fmt::literals;

#define DEFINE_SIMPLE_TYPE_SETTER(T, primitiveTypeId, value_name) \
template <> \
void SetSimpleValue(const T& value, Ydb::TypedValue* proto) { \
proto->mutable_type()->set_type_id(::Ydb::Type::primitiveTypeId); \
proto->mutable_value()->Y_CAT(set_, value_name)(value); \
::Ydb::Type MakeYdbType(::Ydb::Type::PrimitiveTypeId primitiveType, bool optional) {
::Ydb::Type type;
if (optional) {
type.mutable_optional_type()->mutable_item()->Settype_id(primitiveType);
} else {
type.Settype_id(primitiveType);
}
return type;
}

#define DEFINE_SIMPLE_TYPE_SETTER(T, primitiveTypeId, value_name) \
template <> \
void SetSimpleValue(const T& value, Ydb::TypedValue* proto, bool optional) { \
*proto->mutable_type() = MakeYdbType(::Ydb::Type::primitiveTypeId, optional); \
proto->mutable_value()->Y_CAT(set_, value_name)(value); \
}

DEFINE_SIMPLE_TYPE_SETTER(bool, BOOL, bool_value);
DEFINE_SIMPLE_TYPE_SETTER(i32, INT32, int32_value);
DEFINE_SIMPLE_TYPE_SETTER(ui32, UINT32, uint32_value);
DEFINE_SIMPLE_TYPE_SETTER(i64, INT64, int64_value);
DEFINE_SIMPLE_TYPE_SETTER(ui64, UINT64, uint64_value);

void CreatePostgreSQLExternalDataSource(
const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
Expand Down
Loading

0 comments on commit b098b52

Please sign in to comment.