Skip to content

Commit

Permalink
YQ-3803 RD support Json columns without predicate (#11150)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Oct 31, 2024
1 parent ed7c207 commit 09cea11
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 13 deletions.
41 changes: 28 additions & 13 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
private:
using TParserInputType = TSet<std::pair<TString, TString>>;

struct ClientsInfo {
ClientsInfo(const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev)
struct TClientsInfo {
TClientsInfo(const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev)
: Settings(ev->Get()->Record)
, ReadActorId(ev->Sender)
{
Expand Down Expand Up @@ -151,7 +151,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui64 LastMessageOffset = 0;
bool IsWaitingEvents = false;
bool IsStartParsingScheduled = false;
THashMap<NActors::TActorId, ClientsInfo> Clients;
THashMap<NActors::TActorId, TClientsInfo> Clients;
THashSet<NActors::TActorId> ClientsWithoutPredicate;
std::unique_ptr<TJsonParser> Parser;
NConfig::TRowDispatcherConfig Config;
Expand Down Expand Up @@ -190,15 +190,15 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
void SendToParsing(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages);
void DoParsing(bool force = false);
void DoFiltering(const TVector<ui64>& offsets, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
void SendData(ClientsInfo& info);
void SendData(TClientsInfo& info);
void UpdateParser();
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter = nullptr);
void SendDataArrived(ClientsInfo& client);
void SendDataArrived(TClientsInfo& client);
void StopReadSession();
TString GetSessionId() const;
void HandleNewEvents();
TInstant GetMinStartingMessageTimestamp() const;
void AddDataToClient(ClientsInfo& client, ui64 offset, const TString& json);
void AddDataToClient(TClientsInfo& client, ui64 offset, const TString& json);

std::pair<NYql::NUdf::TUnboxedValuePod, i64> CreateItem(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message);

Expand All @@ -215,9 +215,9 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {

void SendStatistic();
void SendSessionError(NActors::TActorId readActorId, const TString& message);
TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> RebuildJson(const ClientsInfo& info, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> RebuildJson(const TClientsInfo& info, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
void UpdateParserSchema(const TParserInputType& inputType);
void UpdateFieldsIds(ClientsInfo& clientInfo);
void UpdateFieldsIds(TClientsInfo& clientInfo);

private:

Expand Down Expand Up @@ -387,7 +387,7 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) {
CreateTopicSession();
}

TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> TTopicSession::RebuildJson(const ClientsInfo& info, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues) {
TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> TTopicSession::RebuildJson(const TClientsInfo& info, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues) {
TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> result;
const auto& offsets = ParserSchema.FieldsMap;
result.reserve(info.FieldsIds.size());
Expand Down Expand Up @@ -601,7 +601,7 @@ void TTopicSession::DoFiltering(const TVector<ui64>& offsets, const TVector<NKik
Send(SelfId(), new TEvPrivate::TEvDataFiltered(offsets.back()));
}

void TTopicSession::SendData(ClientsInfo& info) {
void TTopicSession::SendData(TClientsInfo& info) {
info.DataArrivedSent = false;
if (info.Buffer.empty()) {
LOG_ROW_DISPATCHER_TRACE("Buffer empty");
Expand Down Expand Up @@ -639,7 +639,7 @@ void TTopicSession::SendData(ClientsInfo& info) {
info.LastSendedNextMessageOffset = *info.NextMessageOffset;
}

void TTopicSession::UpdateFieldsIds(ClientsInfo& info) {
void TTopicSession::UpdateFieldsIds(TClientsInfo& info) {
for (auto name : info.Settings.GetSource().GetColumns()) {
auto it = FieldsIndexes.find(name);
if (it == FieldsIndexes.end()) {
Expand All @@ -652,6 +652,15 @@ void TTopicSession::UpdateFieldsIds(ClientsInfo& info) {
}
}

bool HasJsonColumns(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
for (const auto& type : sourceParams.GetColumnTypes()) {
if (type.Contains("Json")) {
return true;
}
}
return false;
}

void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
auto it = Clients.find(ev->Sender);
if (it != Clients.end()) {
Expand All @@ -678,6 +687,12 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
UpdateFieldsIds(clientInfo);

TString predicate = clientInfo.Settings.GetSource().GetPredicate();

// TODO: remove this when the re-parsing is removed from pq read actor
if (predicate.empty() && HasJsonColumns(clientInfo.Settings.GetSource())) {
predicate = "WHERE TRUE";
}

if (!predicate.empty()) {
clientInfo.Filter = NewJsonFilter(
columns,
Expand Down Expand Up @@ -710,7 +725,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
}
}

void TTopicSession::AddDataToClient(ClientsInfo& info, ui64 offset, const TString& json) {
void TTopicSession::AddDataToClient(TClientsInfo& info, ui64 offset, const TString& json) {
if (info.NextMessageOffset && offset < info.NextMessageOffset) {
return;
}
Expand Down Expand Up @@ -836,7 +851,7 @@ void TTopicSession::StopReadSession() {
TopicClient.Reset();
}

void TTopicSession::SendDataArrived(ClientsInfo& info) {
void TTopicSession::SendDataArrived(TClientsInfo& info) {
if (info.Buffer.empty() || info.DataArrivedSent) {
return;
}
Expand Down
30 changes: 30 additions & 0 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,36 @@ def test_nested_types(self, kikimr, client):
issues = str(client.describe_query(query_id).result.query.transient_issue)
assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues

@yq_v1
def test_nested_types_without_predicate(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_nested_types_without_predicate")

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data Json NOT NULL, event String NOT NULL));'''

query_id = start_yds_query(kikimr, client, sql)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)

data = [
'{"time": 101, "data": {"key": "value"}, "event": "event1"}',
'{"time": 102, "data": ["key1", "key2"], "event": "event2"}'
]

self.write_stream(data)
expected = [
'{"key": "value"}',
'["key1", "key2"]'
]
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected

wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
stop_yds_query(client, query_id)

@yq_v1
def test_filter(self, kikimr, client):
client.create_yds_connection(
Expand Down

0 comments on commit 09cea11

Please sign in to comment.