Skip to content

Commit

Permalink
Merge b9b6906 into 1c441a9
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Nov 14, 2024
2 parents 1c441a9 + b9b6906 commit b184988
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 178 deletions.
1 change: 1 addition & 0 deletions ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ message TRowDispatcherCoordinatorConfig {
message TJsonParserConfig {
uint64 BatchSizeBytes = 1;
uint64 BatchCreationTimeoutMs = 2;
uint64 StaticBufferSize = 3; // (number rows) * (number columns) limit, default 10^6 ~ 24 MiB
}

message TRowDispatcherConfig {
Expand Down
37 changes: 24 additions & 13 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ NYT::TNode MakeOutputSchema() {
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
}

struct TInputType {
const TVector<ui64>& Offsets;
const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& Values;
const ui64 RowsOffset; // ofset of first value
const ui64 NumberRows;

ui64 GetOffset(ui64 rowId) const {
return Offsets[rowId + RowsOffset];
}
};

class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
public:
TFilterInputSpec(const NYT::TNode& schema)
Expand All @@ -85,7 +96,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
TVector<NYT::TNode> Schemas;
};

class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>> {
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<TInputType> {
public:
TFilterInputConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -123,26 +134,26 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
}
}

void OnObject(std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&> values) override {
Y_ENSURE(FieldsPositions.size() == values.second.size());
void OnObject(TInputType input) override {
Y_ENSURE(FieldsPositions.size() == input.Values.size());

NKikimr::NMiniKQL::TThrowingBindTerminator bind;
with_lock (Worker->GetScopedAlloc()) {
auto& holderFactory = Worker->GetGraph().GetHolderFactory();

// TODO: use blocks here
for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) {
for (size_t rowId = 0; rowId < input.NumberRows; ++rowId) {
NYql::NUdf::TUnboxedValue* items = nullptr;

NYql::NUdf::TUnboxedValue result = Cache.NewArray(
holderFactory,
static_cast<ui32>(values.second.size() + 1),
static_cast<ui32>(input.Values.size() + 1),
items);

items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first[rowId]);
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(input.GetOffset(rowId));

size_t fieldId = 0;
for (const auto& column : values.second) {
for (const auto column : input.Values) {
items[FieldsPositions[fieldId++]] = column->at(rowId);
}

Expand Down Expand Up @@ -236,7 +247,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
static constexpr bool IsPartial = false;
static constexpr bool SupportPushStreamMode = true;

using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>>;
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<TInputType>>;

static TConsumerType MakeConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -282,9 +293,9 @@ class TJsonFilter::TImpl {
LOG_ROW_DISPATCHER_DEBUG("Program created");
}

void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
void Push(const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 rowsOffset, ui64 numberRows) {
Y_ENSURE(values, "Expected non empty schema");
InputConsumer->OnObject(std::make_pair(offsets, values));
InputConsumer->OnObject({.Offsets = offsets, .Values = values, .RowsOffset = rowsOffset, .NumberRows = numberRows});
}

TString GetSql() const {
Expand All @@ -305,7 +316,7 @@ class TJsonFilter::TImpl {

private:
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>> InputConsumer;
THolder<NYql::NPureCalc::IConsumer<TInputType>> InputConsumer;
const TString Sql;
};

Expand All @@ -322,8 +333,8 @@ TJsonFilter::TJsonFilter(
TJsonFilter::~TJsonFilter() {
}

void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
Impl->Push(offsets, values);
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 rowsOffset, ui64 numberRows) {
Impl->Push(offsets, values, rowsOffset, numberRows);
}

TString TJsonFilter::GetSql() {
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include "common.h"

#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/public/udf/udf_data_type.h>
#include <yql/essentials/public/udf/udf_value.h>

namespace NFq {
Expand All @@ -23,7 +21,7 @@ class TJsonFilter {

~TJsonFilter();

void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values);
void Push(const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 rowsOffset, ui64 numberRows);
TString GetSql();

private:
Expand Down
98 changes: 60 additions & 38 deletions ydb/core/fq/libs/row_dispatcher/json_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

#include <yql/essentials/minikql/dom/json.h>
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
#include <yql/essentials/minikql/mkql_function_registry.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
#include <yql/essentials/minikql/mkql_program_builder.h>
#include <yql/essentials/minikql/mkql_string_util.h>
#include <yql/essentials/minikql/mkql_type_ops.h>
#include <yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h>

#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>
Expand All @@ -17,6 +19,8 @@ namespace {

TString LogPrefix = "JsonParser: ";

constexpr ui64 DEFAULT_STATIC_BUFFER_SIZE = 1000000;

struct TJsonParserBuffer {
size_t NumberValues = 0;
bool Finished = false;
Expand Down Expand Up @@ -80,31 +84,31 @@ class TColumnParser {
const TString TypeYson;
const NKikimr::NMiniKQL::TType* TypeMkql;
const bool IsOptional = false;
size_t NumberValues = 0;
TVector<size_t> ParsedRows;

public:
TColumnParser(const TString& name, const TString& typeYson, NKikimr::NMiniKQL::TProgramBuilder& programBuilder)
TColumnParser(const TString& name, const TString& typeYson, ui64 maxNumberRows, NKikimr::NMiniKQL::TProgramBuilder& programBuilder)
: Name(name)
, TypeYson(typeYson)
, TypeMkql(NYql::NCommon::ParseTypeFromYson(TStringBuf(typeYson), programBuilder, Cerr))
, IsOptional(TypeMkql->IsOptional())
, NumberValues(0)
{
ParsedRows.reserve(maxNumberRows);
try {
Parser = CreateParser(TypeMkql);
} catch (...) {
throw yexception() << "Failed to create parser for column '" << Name << "' with type " << TypeYson << ", description: " << CurrentExceptionMessage();
}
}

void ParseJsonValue(simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) {
void ParseJsonValue(ui64 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) {
Parser(jsonValue, resultValue);
NumberValues++;
ParsedRows.emplace_back(rowId);
}

void ValidateNumberValues(size_t expectedNumberValues, ui64 firstOffset) const {
if (Y_UNLIKELY(!IsOptional && NumberValues < expectedNumberValues)) {
throw yexception() << "Failed to parse json messages, found " << expectedNumberValues - NumberValues << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson;
if (Y_UNLIKELY(!IsOptional && ParsedRows.size() < expectedNumberValues)) {
throw yexception() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson;
}
}

Expand Down Expand Up @@ -273,11 +277,13 @@ namespace NFq {

class TJsonParser::TImpl {
public:
TImpl(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout)
TImpl(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize)
: Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
, TypeEnv(std::make_unique<NKikimr::NMiniKQL::TTypeEnvironment>(Alloc))
, BatchSize(batchSize)
, MaxNumberRows(((staticBufferSize ? staticBufferSize : DEFAULT_STATIC_BUFFER_SIZE) - 1) / columns.size() + 1)
, BatchCreationTimeout(batchCreationTimeout)
, ParseCallback(parseCallback)
, ParsedValues(columns.size())
{
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
Expand All @@ -288,7 +294,7 @@ class TJsonParser::TImpl {

Columns.reserve(columns.size());
for (size_t i = 0; i < columns.size(); i++) {
Columns.emplace_back(columns[i], types[i], programBuilder);
Columns.emplace_back(columns[i], types[i], MaxNumberRows, programBuilder);
}
}

Expand All @@ -297,7 +303,11 @@ class TJsonParser::TImpl {
ColumnsIndex.emplace(std::string_view(Columns[i].Name), i);
}

Buffer.Reserve(BatchSize, 1);
for (size_t i = 0; i < columns.size(); i++) {
ParsedValues[i].resize(MaxNumberRows);
}

Buffer.Reserve(BatchSize, MaxNumberRows);

LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name());
Parser.threaded = false;
Expand Down Expand Up @@ -330,21 +340,20 @@ class TJsonParser::TImpl {
Buffer.AddMessages(messages);
}

const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& Parse() {
void Parse() {
Y_ENSURE(Buffer.IsReady(), "Nothing to parse");

const auto [values, size] = Buffer.Finish();
LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values);

with_lock (Alloc) {
ClearColumns(Buffer.NumberValues);

const ui64 firstOffset = Buffer.Offsets.front();
size_t rowId = 0;
size_t parsedRows = 0;
simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE);
for (auto document : documents) {
if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) {
throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId + 1;
if (Y_UNLIKELY(parsedRows >= Buffer.NumberValues)) {
throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << parsedRows + 1;
}
for (auto item : document.get_object()) {
const auto it = ColumnsIndex.find(item.escaped_key().value());
Expand All @@ -355,23 +364,28 @@ class TJsonParser::TImpl {
const size_t columnId = it->second;
auto& columnParser = Columns[columnId];
try {
columnParser.ParseJsonValue(item.value(), ParsedValues[columnId][rowId]);
columnParser.ParseJsonValue(rowId, item.value(), ParsedValues[columnId][rowId]);
} catch (...) {
throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnParser.Name << "' with type " << columnParser.TypeYson << ", description: " << CurrentExceptionMessage();
}
}

rowId++;
parsedRows++;

if (rowId == MaxNumberRows) {
ClearColumns(parsedRows, MaxNumberRows);
rowId = 0;
}
}

if (rowId != Buffer.NumberValues) {
if (parsedRows != Buffer.NumberValues) {
throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId;
}
for (const auto& columnDesc : Columns) {
columnDesc.ValidateNumberValues(rowId, firstOffset);
if (rowId) {
ClearColumns(parsedRows, rowId);
}
}

return ParsedValues;
}

TString GetDescription() const {
Expand All @@ -385,26 +399,32 @@ class TJsonParser::TImpl {

~TImpl() {
with_lock (Alloc) {
ClearColumns(0);
ParsedValues.clear();
Columns.clear();
TypeEnv.reset();
}
}

private:
void ClearColumns(size_t newSize) {
const auto clearValue = [&allocState = Alloc.Ref()](NYql::NUdf::TUnboxedValue& value){
value.UnlockRef(1);
value.Clear();
};
void ClearColumns(size_t parsedRows, size_t savedRows) {
const ui64 firstOffset = Buffer.Offsets.front();
for (const auto& column : Columns) {
column.ValidateNumberValues(savedRows, firstOffset);
}

for (size_t i = 0; i < Columns.size(); ++i) {
Columns[i].NumberValues = 0;
{
auto unguard = Unguard(Alloc);
ParseCallback(parsedRows - savedRows, savedRows, ParsedValues);
}

for (size_t i = 0; i < Columns.size(); ++i) {
auto& parsedColumn = ParsedValues[i];
std::for_each(parsedColumn.begin(), parsedColumn.end(), clearValue);
parsedColumn.resize(newSize);
for (size_t rowId : Columns[i].ParsedRows) {
auto& parsedRow = parsedColumn[rowId];
parsedRow.UnlockRef(1);
parsedRow.Clear();
}
Columns[i].ParsedRows.clear();
}
}

Expand All @@ -413,18 +433,20 @@ class TJsonParser::TImpl {
std::unique_ptr<NKikimr::NMiniKQL::TTypeEnvironment> TypeEnv;

const ui64 BatchSize;
const ui64 MaxNumberRows;
const TDuration BatchCreationTimeout;
const TCallback ParseCallback;
TVector<TColumnParser> Columns;
absl::flat_hash_map<std::string_view, size_t> ColumnsIndex;

TJsonParserBuffer Buffer;
simdjson::ondemand::parser Parser;

TVector<NKikimr::NMiniKQL::TUnboxedValueVector> ParsedValues;
TVector<TVector<NYql::NUdf::TUnboxedValue>> ParsedValues;
};

TJsonParser::TJsonParser(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout)
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types, batchSize, batchCreationTimeout))
TJsonParser::TJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize)
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types, parseCallback, batchSize, batchCreationTimeout, staticBufferSize))
{}

TJsonParser::~TJsonParser() {
Expand All @@ -450,16 +472,16 @@ const TVector<ui64>& TJsonParser::GetOffsets() const {
return Impl->GetOffsets();
}

const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& TJsonParser::Parse() {
return Impl->Parse();
void TJsonParser::Parse() {
Impl->Parse();
}

TString TJsonParser::GetDescription() const {
return Impl->GetDescription();
}

std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout) {
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types, batchSize, batchCreationTimeout));
std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize) {
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types, parseCallback, batchSize, batchCreationTimeout, staticBufferSize));
}

} // namespace NFq
Loading

0 comments on commit b184988

Please sign in to comment.