From ec1194e4b0e7e9c2212890662eea1c3989949dd2 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Tue, 13 Feb 2024 11:10:53 -0800 Subject: [PATCH 01/16] Extracted KafkaSchemaRegistry out from AvroRowInputFormat --- src/Formats/KafkaSchemaRegistry.cpp | 99 +++++++++++ src/Formats/KafkaSchemaRegistry.h | 22 +++ .../Formats/Impl/AvroRowInputFormat.cpp | 163 +++++++++--------- 3 files changed, 207 insertions(+), 77 deletions(-) create mode 100644 src/Formats/KafkaSchemaRegistry.cpp create mode 100644 src/Formats/KafkaSchemaRegistry.h diff --git a/src/Formats/KafkaSchemaRegistry.cpp b/src/Formats/KafkaSchemaRegistry.cpp new file mode 100644 index 00000000000..a81cb121bb3 --- /dev/null +++ b/src/Formats/KafkaSchemaRegistry.cpp @@ -0,0 +1,99 @@ +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int INCORRECT_DATA; +} + +String KafkaSchemaRegistry::fetchSchema(const Poco::URI & base_url, UInt32 id) +{ + assert(!base_url.empty()); + + try + { + try + { + Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); + LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Fetching schema id = {}", id); + + /// One second for connect/send/receive. Just in case. + ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); + + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); + request.setHost(url.getHost()); + + auto session = makePooledHTTPSession(url, timeouts, 1); + std::istream * response_body{}; + try + { + session->sendRequest(request); + + Poco::Net::HTTPResponse response; + response_body = receiveResponse(*session, request, response, false); + } + catch (const Poco::Exception & e) + { + /// We use session data storage as storage for exception text + /// Depend on it we can deduce to reconnect session or reresolve session host + session->attachSessionData(e.message()); + throw; + } + Poco::JSON::Parser parser; + auto json_body = parser.parse(*response_body).extract(); + auto schema = json_body->getValue("schema"); + LOG_TRACE((&Poco::Logger::get("KafkaSchemaRegistry")), "Successfully fetched schema id = {}\n{}", id, schema); + return schema; + } + catch (const Exception &) + { + throw; + } + catch (const Poco::Exception & e) + { + throw Exception(Exception::CreateFromPocoTag{}, e); + } + } + catch (Exception & e) + { + e.addMessage("while fetching schema id = " + std::to_string(id)); + throw; + } +} + +UInt32 KafkaSchemaRegistry::readSchemaId(ReadBuffer & in) +{ + uint8_t magic; + uint32_t schema_id; + + try + { + readBinaryBigEndian(magic, in); + readBinaryBigEndian(schema_id, in); + } + catch (const Exception & e) + { + if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA) + { + /// empty or incomplete message without magic byte or schema id + throw Exception("Missing magic byte or schema identifier.", ErrorCodes::INCORRECT_DATA); + } + else + throw; + } + + if (magic != 0x00) + { + throw Exception("Invalid magic byte before schema identifier." + " Must be zero byte, found " + std::to_string(int(magic)) + " instead", ErrorCodes::INCORRECT_DATA); + } + + return schema_id; +} + +} diff --git a/src/Formats/KafkaSchemaRegistry.h b/src/Formats/KafkaSchemaRegistry.h new file mode 100644 index 00000000000..a3afa40c5c0 --- /dev/null +++ b/src/Formats/KafkaSchemaRegistry.h @@ -0,0 +1,22 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class KafkaSchemaRegistry final +{ +public: + static KafkaSchemaRegistry & instance() + { + static KafkaSchemaRegistry ret {}; + return ret; + } + + UInt32 readSchemaId(ReadBuffer & in); + String fetchSchema(const Poco::URI & base_url, UInt32 id); +}; + +} diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 9759aee830b..9d4d36ab94f 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -1,5 +1,6 @@ #include "AvroRowInputFormat.h" #include "DataTypes/DataTypeLowCardinality.h" +#include "Formats/KafkaSchemaRegistry.h" #if USE_AVRO #include @@ -663,59 +664,65 @@ class AvroConfluentRowInputFormat::SchemaRegistry private: avro::ValidSchema fetchSchema(uint32_t id) { - try - { + /// proton: starts + /// try + /// { try { - Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); - LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Fetching schema id = {}", id); - - /// One second for connect/send/receive. Just in case. - ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); - - Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); - request.setHost(url.getHost()); - - auto session = makePooledHTTPSession(url, timeouts, 1); - std::istream * response_body{}; - try - { - session->sendRequest(request); - - Poco::Net::HTTPResponse response; - response_body = receiveResponse(*session, request, response, false); - } - catch (const Poco::Exception & e) - { - /// We use session data storage as storage for exception text - /// Depend on it we can deduce to reconnect session or reresolve session host - session->attachSessionData(e.message()); - throw; - } - Poco::JSON::Parser parser; - auto json_body = parser.parse(*response_body).extract(); - auto schema = json_body->getValue("schema"); - LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Successfully fetched schema id = {}\n{}", id, schema); + /// Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); + /// LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Fetching schema id = {}", id); + /// + /// /// One second for connect/send/receive. Just in case. + /// ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); + /// + /// Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); + /// request.setHost(url.getHost()); + /// + /// auto session = makePooledHTTPSession(url, timeouts, 1); + /// std::istream * response_body{}; + /// try + /// { + /// session->sendRequest(request); + /// + /// Poco::Net::HTTPResponse response; + /// response_body = receiveResponse(*session, request, response, false); + /// } + /// catch (const Poco::Exception & e) + /// { + /// /// We use session data storage as storage for exception text + /// /// Depend on it we can deduce to reconnect session or reresolve session host + /// session->attachSessionData(e.message()); + /// throw; + /// } + /// Poco::JSON::Parser parser; + /// auto json_body = parser.parse(*response_body).extract(); + /// auto schema = json_body->getValue("schema"); + /// LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Successfully fetched schema id = {}\n{}", id, schema); + + auto schema = KafkaSchemaRegistry::instance().fetchSchema(base_url, id); return avro::compileJsonSchemaFromString(schema); } - catch (const Exception &) - { - throw; - } - catch (const Poco::Exception & e) - { - throw Exception(Exception::CreateFromPocoTag{}, e); - } + /// catch (const Exception &) + /// { + /// throw; + /// } + /// catch (const Poco::Exception & e) + /// { + /// throw Exception(Exception::CreateFromPocoTag{}, e); + /// } catch (const avro::Exception & e) { - throw Exception(e.what(), ErrorCodes::INCORRECT_DATA); + auto ex = Exception(e.what(), ErrorCodes::INCORRECT_DATA); + ex.addMessage("while fetching schema id = " + std::to_string(id)); + throw std::move(ex); } - } - catch (Exception & e) - { - e.addMessage("while fetching schema id = " + std::to_string(id)); - throw; - } + /// } + /// catch (Exception & e) + /// { + /// e.addMessage("while fetching schema id = " + std::to_string(id)); + /// throw; + /// } + /// proton: ends } Poco::URI base_url; @@ -740,35 +747,37 @@ static std::shared_ptr getConfluentSchemaRegistry(const return schema_registry; } -static uint32_t readConfluentSchemaId(ReadBuffer & in) -{ - uint8_t magic; - uint32_t schema_id; - - try - { - readBinaryBigEndian(magic, in); - readBinaryBigEndian(schema_id, in); - } - catch (const Exception & e) - { - if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA) - { - /* empty or incomplete message without Avro Confluent magic number or schema id */ - throw Exception("Missing AvroConfluent magic byte or schema identifier.", ErrorCodes::INCORRECT_DATA); - } - else - throw; - } - - if (magic != 0x00) - { - throw Exception("Invalid magic byte before AvroConfluent schema identifier." - " Must be zero byte, found " + std::to_string(int(magic)) + " instead", ErrorCodes::INCORRECT_DATA); - } - - return schema_id; -} +/// proton: starts +/// static uint32_t readConfluentSchemaId(ReadBuffer & in) +/// { +/// uint8_t magic; +/// uint32_t schema_id; +/// +/// try +/// { +/// readBinaryBigEndian(magic, in); +/// readBinaryBigEndian(schema_id, in); +/// } +/// catch (const Exception & e) +/// { +/// if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA) +/// { +/// /* empty or incomplete message without Avro Confluent magic number or schema id */ +/// throw Exception("Missing AvroConfluent magic byte or schema identifier.", ErrorCodes::INCORRECT_DATA); +/// } +/// else +/// throw; +/// } +/// +/// if (magic != 0x00) +/// { +/// throw Exception("Invalid magic byte before AvroConfluent schema identifier." +/// " Must be zero byte, found " + std::to_string(int(magic)) + " instead", ErrorCodes::INCORRECT_DATA); +/// } +/// +/// return schema_id; +/// } +/// proton: ends AvroConfluentRowInputFormat::AvroConfluentRowInputFormat( const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) @@ -793,7 +802,7 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten { return false; } - SchemaId schema_id = readConfluentSchemaId(*in); + SchemaId schema_id = KafkaSchemaRegistry::instance().readSchemaId(*in); const auto & deserializer = getOrCreateDeserializer(schema_id); deserializer.deserializeRow(columns, *decoder, ext); decoder->drain(); @@ -828,7 +837,7 @@ NamesAndTypesList AvroSchemaReader::readSchema() avro::NodePtr root_node; if (confluent) { - UInt32 schema_id = readConfluentSchemaId(in); + UInt32 schema_id = KafkaSchemaRegistry::instance().readSchemaId(in); root_node = getConfluentSchemaRegistry(format_settings)->getSchema(schema_id).root(); } else From 5c4a73563c07514831a280e76af41af67137d9e9 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Tue, 20 Feb 2024 01:42:21 -0800 Subject: [PATCH 02/16] Schema registry support for Protobuf --- src/Core/Settings.h | 1 + src/Formats/FormatFactory.cpp | 3 + src/Formats/FormatSettings.h | 3 + src/Formats/KafkaSchemaRegistry.cpp | 6 +- src/Formats/KafkaSchemaRegistry.h | 3 +- src/Formats/ProtobufSchemas.cpp | 86 ++++++++- src/Formats/ProtobufSchemas.h | 14 ++ .../Formats/Impl/AvroRowInputFormat.cpp | 6 +- .../Formats/Impl/ProtobufRowInputFormat.cpp | 170 +++++++++++++----- .../Formats/Impl/ProtobufRowInputFormat.h | 35 +++- 10 files changed, 272 insertions(+), 55 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 925ffa9e552..4c0f7fff47f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -769,6 +769,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map proton Enum and CapnProto Enum", 0)\ M(String, rawstore_time_extraction_type, "", "_tp_time extraction type (string, json, regex)", 0) \ M(String, rawstore_time_extraction_rule, "", "_tp_time extraction rule (string, json, regex)", 0) \ + M(URI, kafka_schema_registry_url, "", "For ProtobufSingle format: Kafka Schema Registry URL.", 0) \ /** proton: ends. */ // End of FORMAT_FACTORY_SETTINGS // Please add settings non-related to formats into the COMMON_SETTINGS above. diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 0e0b9c71ff8..feb935eea0a 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -109,6 +109,9 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.schema.format_schema = settings.format_schema; format_settings.schema.format_schema_path = context->getFormatSchemaPath(); format_settings.schema.is_server = context->hasGlobalContext() && (context->getGlobalContext()->getApplicationType() == Context::ApplicationType::SERVER); + /// proton: starts + format_settings.schema.kafka_schema_registry_url = settings.kafka_schema_registry_url.toString(); + /// proton: ends format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields; format_settings.template_settings.resultset_format = settings.format_template_resultset; format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 08ff2975483..cb154d16454 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -207,6 +207,9 @@ struct FormatSettings std::string format_schema; std::string format_schema_path; bool is_server = false; + /// proton: starts + std::string kafka_schema_registry_url; + /// proton: ends } schema; struct diff --git a/src/Formats/KafkaSchemaRegistry.cpp b/src/Formats/KafkaSchemaRegistry.cpp index a81cb121bb3..d3c7b09c43c 100644 --- a/src/Formats/KafkaSchemaRegistry.cpp +++ b/src/Formats/KafkaSchemaRegistry.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace DB { @@ -11,7 +12,7 @@ namespace ErrorCodes extern const int INCORRECT_DATA; } -String KafkaSchemaRegistry::fetchSchema(const Poco::URI & base_url, UInt32 id) +String KafkaSchemaRegistry::fetchSchema(const Poco::URI & base_url, UInt32 id, const String & username, const String & password) { assert(!base_url.empty()); @@ -28,6 +29,9 @@ String KafkaSchemaRegistry::fetchSchema(const Poco::URI & base_url, UInt32 id) Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); request.setHost(url.getHost()); + if (!username.empty()) + Poco::Net::HTTPBasicCredentials(username, password).authenticate(request); + auto session = makePooledHTTPSession(url, timeouts, 1); std::istream * response_body{}; try diff --git a/src/Formats/KafkaSchemaRegistry.h b/src/Formats/KafkaSchemaRegistry.h index a3afa40c5c0..16d285a9d32 100644 --- a/src/Formats/KafkaSchemaRegistry.h +++ b/src/Formats/KafkaSchemaRegistry.h @@ -6,6 +6,7 @@ namespace DB { +/// A helper class helps working with Kafka schema registry. class KafkaSchemaRegistry final { public: @@ -16,7 +17,7 @@ class KafkaSchemaRegistry final } UInt32 readSchemaId(ReadBuffer & in); - String fetchSchema(const Poco::URI & base_url, UInt32 id); + String fetchSchema(const Poco::URI & base_url, UInt32 id, const String & username = "", const String & password = ""); }; } diff --git a/src/Formats/ProtobufSchemas.cpp b/src/Formats/ProtobufSchemas.cpp index dbf70489814..58bb7ebe033 100644 --- a/src/Formats/ProtobufSchemas.cpp +++ b/src/Formats/ProtobufSchemas.cpp @@ -1,11 +1,13 @@ #include "config.h" #if USE_PROTOBUF +# include +# include # include -# include +# include # include +# include # include -# include namespace DB @@ -106,6 +108,86 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSch } /// proton: starts +class ProtobufSchemas::SchemaRegistry +{ +public: + explicit SchemaRegistry(const std::string & base_url_, size_t schema_cache_max_size = 1000) + : base_url(base_url_), schema_cache(schema_cache_max_size) + { + if (base_url.empty()) + throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS); + } + + google::protobuf::FileDescriptorProto getSchema(uint32_t id) + { + auto [schema, loaded] = schema_cache.getOrSet( + id, + [this, id](){ return std::make_shared(fetchSchema(id)); } + ); + return *schema; + } + +private: + google::protobuf::FileDescriptorProto fetchSchema(uint32_t id) + { + auto schema = KafkaSchemaRegistry::instance().fetchSchema(base_url, id); + google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; + ErrorCollector error_collector; + google::protobuf::io::Tokenizer tokenizer(&input, &error_collector); + google::protobuf::FileDescriptorProto descriptor; + google::protobuf::compiler::Parser parser; + + parser.RecordErrorsTo(&error_collector); + parser.Parse(&tokenizer, &descriptor); + if (!error_collector.getErrors().empty()) + throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Invalid protobuf schema FIXME"); + + return descriptor; + } + + Poco::URI base_url; + LRUCache schema_cache; +}; + +using ConfluentSchemaRegistry = ProtobufSchemas::SchemaRegistry; +#define SCHEMA_REGISTRY_CACHE_MAX_SIZE 1000 +/// Cache of Schema Registry URL -> SchemaRegistry +static LRUCache schema_registry_cache(SCHEMA_REGISTRY_CACHE_MAX_SIZE); + +static std::shared_ptr getConfluentSchemaRegistry(const String & base_url) +{ + auto [schema_registry, loaded] = schema_registry_cache.getOrSet( + base_url, + [base_url]() + { + return std::make_shared(base_url); + } + ); + return schema_registry; +} +const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForSchemaRegistry(const String & base_url, UInt32 schema_id) +{ + return getMessageTypeForSchemaRegistry(base_url, schema_id, {0}); +} + +const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForSchemaRegistry(const String & base_url, UInt32 schema_id, const std::vector & indexes) +{ + assert(!indexes.empty()); + + auto registry = getConfluentSchemaRegistry(base_url); + const auto *fd = registry_pool()->BuildFile(registry->getSchema(schema_id)); + + const auto *descriptor = fd->message_type(indexes[0]); + + for (auto i : indexes) + { + if (i > static_cast(descriptor->nested_type_count())) + throw Exception(ErrorCodes::INVALID_DATA, "Invalid message index={} max_index={}", i, fd->message_type_count()); + descriptor = descriptor->nested_type(i); + } + return descriptor; +} + SchemaValidationErrors ProtobufSchemas::validateSchema(std::string_view schema) { google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; diff --git a/src/Formats/ProtobufSchemas.h b/src/Formats/ProtobufSchemas.h index 187b6b6c84e..b17cce93550 100644 --- a/src/Formats/ProtobufSchemas.h +++ b/src/Formats/ProtobufSchemas.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace google @@ -42,12 +43,25 @@ class ProtobufSchemas : private boost::noncopyable const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info); /// proton: starts + class SchemaRegistry; + + /// Fetches the schema from the (Kafka) schema registry and returns the descriptor of the first message type. + const google::protobuf::Descriptor * getMessageTypeForSchemaRegistry(const String & base_url, UInt32 schema_id); + /// Fetches the schema from the (Kafka) schema registry and returns the descriptor of the message type indicated by the indexes. + const google::protobuf::Descriptor * getMessageTypeForSchemaRegistry(const String & base_url, UInt32 schema_id, const std::vector & indexes); + SchemaValidationErrors validateSchema(std::string_view schema); /// proton: ends private: class ImporterWithSourceTree; std::unordered_map> importers; std::mutex mutex; + + /// proton: starts + google::protobuf::DescriptorPool registry_pool_; + + google::protobuf::DescriptorPool* registry_pool() { return ®istry_pool_; } + /// proton: ends }; } diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 9d4d36ab94f..f3dfb24fd95 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -1,6 +1,5 @@ #include "AvroRowInputFormat.h" #include "DataTypes/DataTypeLowCardinality.h" -#include "Formats/KafkaSchemaRegistry.h" #if USE_AVRO #include @@ -66,6 +65,9 @@ #include #include +/// proton: starts +#include +/// proton: ends namespace DB { @@ -97,7 +99,7 @@ class InputStreamReadBufferAdapter : public avro::InputStream *data = reinterpret_cast(in.position()); *len = in.available(); - in.position() += in.available(); + in.position() += in.available(); return true; } diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 454bf5236f1..d5b65cebe2a 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -1,21 +1,26 @@ #include "ProtobufRowInputFormat.h" +#include "Common/LRUCache.h" #if USE_PROTOBUF -# include -# include -# include -# include -# include -# include -# include -# include -# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +/// proton: starts +# include +/// proton: ends namespace DB { -ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_) +ProtobufRowInputFormat::ProtobufRowInputFormat( + ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_) : IRowInputFormat(header_, in_, params_, ProcessorID::ProtobufRowInputFormatID) , reader(std::make_unique(in_)) , serializer(ProtobufSerializer::create( @@ -24,7 +29,7 @@ ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & h missing_column_indices, *ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_), with_length_delimiter_, - *reader)) + *reader)) { } @@ -66,30 +71,45 @@ void ProtobufRowInputFormat::syncAfterError() reader->endMessage(true); } -void registerInputFormatProtobuf(FormatFactory & factory) -{ - for (bool with_length_delimiter : {false, true}) - { - factory.registerInputFormat(with_length_delimiter ? "Protobuf" : "ProtobufSingle", [with_length_delimiter]( - ReadBuffer & buf, - const Block & sample, - IRowInputFormat::Params params, - const FormatSettings & settings) +void registerInputFormatProtobuf(FormatFactory & factory){ + /// proton: starts + /// for (bool with_length_delimiter : {false, true}) + /// { + /// factory.registerInputFormat( + /// with_length_delimiter ? "Protobuf" : "ProtobufSingle", + /// [with_length_delimiter]( + /// ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) { + /// return std::make_shared( + /// buf, sample, std::move(params), FormatSchemaInfo(settings, "Protobuf", true), with_length_delimiter); + /// }); + /// } + + factory.registerInputFormat( + "ProtobufSingle", + [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) -> std::shared_ptr { - return std::make_shared(buf, sample, std::move(params), - FormatSchemaInfo(settings, "Protobuf", true), - with_length_delimiter); + if (settings.schema.kafka_schema_registry_url.empty()) + return std::make_shared( + buf, sample, std::move(params), FormatSchemaInfo(settings, "Protobuf", true), /*with_length_delimiter=*/false); + + return std::make_shared( + buf, sample, std::move(params), settings); }); - } + + factory.registerInputFormat( + "Protobuf", + [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) + { + return std::make_shared( + buf, sample, std::move(params), FormatSchemaInfo(settings, "Protobuf", true), /*with_length_delimiter=*/true); + }); + /// proton: ends + } ProtobufSchemaReader::ProtobufSchemaReader(const FormatSettings & format_settings) : schema_info( - format_settings.schema.format_schema, - "Protobuf", - true, - format_settings.schema.is_server, - format_settings.schema.format_schema_path) + format_settings.schema.format_schema, "Protobuf", true, format_settings.schema.is_server, format_settings.schema.format_schema_path) { } @@ -100,14 +120,69 @@ NamesAndTypesList ProtobufSchemaReader::readSchema() } /// proton: starts +ProtobufConfluentRowInputFormat::ProtobufConfluentRowInputFormat( + ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) + : IRowInputFormat(header_, in_, params_, ProcessorID::ProtobufRowInputFormatID) + , registry_url(format_settings_.schema.kafka_schema_registry_url) + , reader(std::make_unique(in_)) +{ +} + +void ProtobufConfluentRowInputFormat::setReadBuffer(ReadBuffer & buf) +{ + IInputFormat::setReadBuffer(buf); + reader->setReadBuffer(buf); +} + +void ProtobufConfluentRowInputFormat::syncAfterError() +{ + reader->endMessage(true); +} + +bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & row_read_extension) +{ + if (reader->eof()) + return false; + + auto & registry = KafkaSchemaRegistry::instance(); + auto schema_id = registry.readSchemaId(*in); + + auto indexes_count = reader->readSInt(); + std::vector indexes(1); + if (indexes_count < 1) + indexes.push_back(0); + else + { + for (size_t i = 0; i < static_cast(indexes_count); i++) + indexes.push_back(reader->readSInt()); + } + + const auto & header = getPort().getHeader(); + + serializer = ProtobufSerializer::create( + header.getNames(), + header.getDataTypes(), + missing_column_indices, + *ProtobufSchemas::instance().getMessageTypeForSchemaRegistry(registry_url, schema_id, indexes), + /*with_length_delimiter=*/false, + *reader); + + size_t row_num = columns.empty() ? 0 : columns[0]->size(); + if (!row_num) + serializer->setColumns(columns.data(), columns.size()); + + serializer->readRow(row_num); + + row_read_extension.read_columns.clear(); + row_read_extension.read_columns.resize(columns.size(), true); + for (size_t column_idx : missing_column_indices) + row_read_extension.read_columns[column_idx] = false; + return true; +} + ProtobufSchemaWriter::ProtobufSchemaWriter(std::string_view schema_body_, const FormatSettings & settings_) : IExternalSchemaWriter(schema_body_, settings_) - , schema_info( - settings.schema.format_schema, - "Protobuf", - false, - settings.schema.is_server, - settings.schema.format_schema_path) + , schema_info(settings.schema.format_schema, "Protobuf", false, settings.schema.is_server, settings.schema.format_schema_path) { } @@ -122,7 +197,7 @@ bool ProtobufSchemaWriter::write(bool replace_if_exist) if (!replace_if_exist) return false; - WriteBufferFromFile write_buffer {schema_info.absoluteSchemaPath()}; + WriteBufferFromFile write_buffer{schema_info.absoluteSchemaPath()}; write_buffer.write(schema_body.data(), schema_body.size()); return true; } @@ -130,24 +205,19 @@ bool ProtobufSchemaWriter::write(bool replace_if_exist) void registerProtobufSchemaReader(FormatFactory & factory) { - factory.registerExternalSchemaReader("Protobuf", [](const FormatSettings & settings) - { - return std::make_shared(settings); - }); + factory.registerExternalSchemaReader( + "Protobuf", [](const FormatSettings & settings) { return std::make_shared(settings); }); factory.registerFileExtension("pb", "Protobuf"); /// proton: starts factory.registerSchemaFileExtension("proto", "Protobuf"); - factory.registerExternalSchemaWriter("Protobuf", [](std::string_view schema_body, const FormatSettings & settings) - { + factory.registerExternalSchemaWriter("Protobuf", [](std::string_view schema_body, const FormatSettings & settings) { return std::make_shared(schema_body, settings); }); /// proton: ends - factory.registerExternalSchemaReader("ProtobufSingle", [](const FormatSettings & settings) - { - return std::make_shared(settings); - }); + factory.registerExternalSchemaReader( + "ProtobufSingle", [](const FormatSettings & settings) { return std::make_shared(settings); }); for (const auto & name : {"Protobuf", "ProtobufSingle"}) factory.registerAdditionalInfoForSchemaCacheGetter( @@ -161,9 +231,13 @@ void registerProtobufSchemaReader(FormatFactory & factory) namespace DB { class FormatFactory; -void registerInputFormatProtobuf(FormatFactory &) {} +void registerInputFormatProtobuf(FormatFactory &) +{ +} -void registerProtobufSchemaReader(FormatFactory &) {} +void registerProtobufSchemaReader(FormatFactory &) +{ +} } #endif diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h index 907129881a4..6910c613359 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h @@ -30,7 +30,12 @@ class ProtobufSerializer; class ProtobufRowInputFormat final : public IRowInputFormat { public: - ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_); + ProtobufRowInputFormat( + ReadBuffer & in_, + const Block & header_, + const Params & params_, + const FormatSchemaInfo & schema_info_, + bool with_length_delimiter_); ~ProtobufRowInputFormat() override; String getName() const override { return "ProtobufRowInputFormat"; } @@ -61,6 +66,34 @@ class ProtobufSchemaReader : public IExternalSchemaReader }; /// proton: starts +/// Confluent framing + Protobuf binary datum encoding. Mainly used for Kafka. +/// Uses 3 caches: +/// 1. global: schema registry cache (base_url -> SchemaRegistry) +/// 2. SchemaRegistry: schema cache (schema_id -> schema) +/// 3. ProtobufConfluentRowInputFormat: deserializer cache (schema_id -> AvroDeserializer) +/// This is needed because KafkaStorage creates a new instance of InputFormat per a batch of messages +class ProtobufConfluentRowInputFormat final : public IRowInputFormat +{ +public: + ProtobufConfluentRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_); + String getName() const override { return "ProtobufConfluentRowInputFormat"; } + + void setReadBuffer(ReadBuffer & buf) override; + +private: + bool readRow(MutableColumns & columns, RowReadExtension & row_read_extension) override; + + bool allowSyncAfterError() const override { return true; } + void syncAfterError() override; + + using SchemaId = uint32_t; + + String registry_url; + std::unique_ptr reader; + std::vector missing_column_indices; + std::unique_ptr serializer; +}; + class ProtobufSchemaWriter : public IExternalSchemaWriter { public: From 3b348efa444283496e99ee5ec9b5c232c6054132 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Tue, 20 Feb 2024 18:49:58 -0800 Subject: [PATCH 03/16] refactored for auth support --- src/Core/Settings.h | 1 + src/Formats/FormatFactory.cpp | 1 + src/Formats/FormatSettings.h | 1 + src/Formats/KafkaSchemaRegistry.cpp | 32 +++- src/Formats/KafkaSchemaRegistry.h | 17 +- src/Formats/ProtobufSchemas.cpp | 80 -------- src/Formats/ProtobufSchemas.h | 15 +- .../Formats/Impl/AvroRowInputFormat.cpp | 178 +++++++++++------- .../Formats/Impl/ProtobufRowInputFormat.cpp | 93 ++++++++- .../Formats/Impl/ProtobufRowInputFormat.h | 8 +- 10 files changed, 243 insertions(+), 183 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 4c0f7fff47f..ffca52e88bc 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -770,6 +770,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(String, rawstore_time_extraction_type, "", "_tp_time extraction type (string, json, regex)", 0) \ M(String, rawstore_time_extraction_rule, "", "_tp_time extraction rule (string, json, regex)", 0) \ M(URI, kafka_schema_registry_url, "", "For ProtobufSingle format: Kafka Schema Registry URL.", 0) \ + M(String, kafka_schema_registry_credentials, "", "Credetials to be used to fetch schema from the `kafka_schema_registry_url`, with format ':'.", 0) \ /** proton: ends. */ // End of FORMAT_FACTORY_SETTINGS // Please add settings non-related to formats into the COMMON_SETTINGS above. diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index feb935eea0a..ec5f24fb0c0 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -111,6 +111,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.schema.is_server = context->hasGlobalContext() && (context->getGlobalContext()->getApplicationType() == Context::ApplicationType::SERVER); /// proton: starts format_settings.schema.kafka_schema_registry_url = settings.kafka_schema_registry_url.toString(); + format_settings.schema.kafka_schema_registry_credentials = settings.kafka_schema_registry_credentials; /// proton: ends format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields; format_settings.template_settings.resultset_format = settings.format_template_resultset; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index cb154d16454..0eb7546ed5a 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -209,6 +209,7 @@ struct FormatSettings bool is_server = false; /// proton: starts std::string kafka_schema_registry_url; + std::string kafka_schema_registry_credentials; /// proton: ends } schema; diff --git a/src/Formats/KafkaSchemaRegistry.cpp b/src/Formats/KafkaSchemaRegistry.cpp index d3c7b09c43c..fede187cd3d 100644 --- a/src/Formats/KafkaSchemaRegistry.cpp +++ b/src/Formats/KafkaSchemaRegistry.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB { @@ -10,9 +11,25 @@ namespace DB namespace ErrorCodes { extern const int INCORRECT_DATA; +extern const int TYPE_MISMATCH; } -String KafkaSchemaRegistry::fetchSchema(const Poco::URI & base_url, UInt32 id, const String & username, const String & password) +KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String & credentials_): base_url(base_url_) +{ + if (credentials_.empty()) + return; + + auto pos = credentials_.find(':'); + if (pos == credentials_.npos) + credentials.setUsername(credentials_); + else + { + credentials.setUsername(credentials_.substr(0, pos)); + credentials.setPassword(credentials_.substr(pos)); + } +} + +String KafkaSchemaRegistry::fetchSchema(UInt32 id, const String & expected_schema_type) { assert(!base_url.empty()); @@ -21,7 +38,7 @@ String KafkaSchemaRegistry::fetchSchema(const Poco::URI & base_url, UInt32 id, c try { Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); - LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Fetching schema id = {}", id); + LOG_TRACE((&Poco::Logger::get("KafkaSchemaRegistry")), "Fetching schema id = {}", id); /// One second for connect/send/receive. Just in case. ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); @@ -29,8 +46,8 @@ String KafkaSchemaRegistry::fetchSchema(const Poco::URI & base_url, UInt32 id, c Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); request.setHost(url.getHost()); - if (!username.empty()) - Poco::Net::HTTPBasicCredentials(username, password).authenticate(request); + if (!credentials.empty()) + credentials.authenticate(request); auto session = makePooledHTTPSession(url, timeouts, 1); std::istream * response_body{}; @@ -50,6 +67,13 @@ String KafkaSchemaRegistry::fetchSchema(const Poco::URI & base_url, UInt32 id, c } Poco::JSON::Parser parser; auto json_body = parser.parse(*response_body).extract(); + + if (!expected_schema_type.empty()) + { + auto schema_type = json_body->getValue("type"); + if (boost::iequals(schema_type, expected_schema_type)) + throw Exception(ErrorCodes::TYPE_MISMATCH, "Expected schema type {}, got {}", expected_schema_type, schema_type); + } auto schema = json_body->getValue("schema"); LOG_TRACE((&Poco::Logger::get("KafkaSchemaRegistry")), "Successfully fetched schema id = {}\n{}", id, schema); return schema; diff --git a/src/Formats/KafkaSchemaRegistry.h b/src/Formats/KafkaSchemaRegistry.h index 16d285a9d32..0e7fce939b0 100644 --- a/src/Formats/KafkaSchemaRegistry.h +++ b/src/Formats/KafkaSchemaRegistry.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include namespace DB @@ -10,14 +11,16 @@ namespace DB class KafkaSchemaRegistry final { public: - static KafkaSchemaRegistry & instance() - { - static KafkaSchemaRegistry ret {}; - return ret; - } + static UInt32 readSchemaId(ReadBuffer & in); - UInt32 readSchemaId(ReadBuffer & in); - String fetchSchema(const Poco::URI & base_url, UInt32 id, const String & username = "", const String & password = ""); + /// `credentials_` is expected to be formatted in ":". + KafkaSchemaRegistry(const String & base_url_, const String & credentials_); + + String fetchSchema(UInt32 id, const String & expected_schema_type = ""); + +private: + Poco::URI base_url; + Poco::Net::HTTPBasicCredentials credentials; }; } diff --git a/src/Formats/ProtobufSchemas.cpp b/src/Formats/ProtobufSchemas.cpp index 58bb7ebe033..e039442db2a 100644 --- a/src/Formats/ProtobufSchemas.cpp +++ b/src/Formats/ProtobufSchemas.cpp @@ -108,86 +108,6 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSch } /// proton: starts -class ProtobufSchemas::SchemaRegistry -{ -public: - explicit SchemaRegistry(const std::string & base_url_, size_t schema_cache_max_size = 1000) - : base_url(base_url_), schema_cache(schema_cache_max_size) - { - if (base_url.empty()) - throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS); - } - - google::protobuf::FileDescriptorProto getSchema(uint32_t id) - { - auto [schema, loaded] = schema_cache.getOrSet( - id, - [this, id](){ return std::make_shared(fetchSchema(id)); } - ); - return *schema; - } - -private: - google::protobuf::FileDescriptorProto fetchSchema(uint32_t id) - { - auto schema = KafkaSchemaRegistry::instance().fetchSchema(base_url, id); - google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; - ErrorCollector error_collector; - google::protobuf::io::Tokenizer tokenizer(&input, &error_collector); - google::protobuf::FileDescriptorProto descriptor; - google::protobuf::compiler::Parser parser; - - parser.RecordErrorsTo(&error_collector); - parser.Parse(&tokenizer, &descriptor); - if (!error_collector.getErrors().empty()) - throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Invalid protobuf schema FIXME"); - - return descriptor; - } - - Poco::URI base_url; - LRUCache schema_cache; -}; - -using ConfluentSchemaRegistry = ProtobufSchemas::SchemaRegistry; -#define SCHEMA_REGISTRY_CACHE_MAX_SIZE 1000 -/// Cache of Schema Registry URL -> SchemaRegistry -static LRUCache schema_registry_cache(SCHEMA_REGISTRY_CACHE_MAX_SIZE); - -static std::shared_ptr getConfluentSchemaRegistry(const String & base_url) -{ - auto [schema_registry, loaded] = schema_registry_cache.getOrSet( - base_url, - [base_url]() - { - return std::make_shared(base_url); - } - ); - return schema_registry; -} -const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForSchemaRegistry(const String & base_url, UInt32 schema_id) -{ - return getMessageTypeForSchemaRegistry(base_url, schema_id, {0}); -} - -const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForSchemaRegistry(const String & base_url, UInt32 schema_id, const std::vector & indexes) -{ - assert(!indexes.empty()); - - auto registry = getConfluentSchemaRegistry(base_url); - const auto *fd = registry_pool()->BuildFile(registry->getSchema(schema_id)); - - const auto *descriptor = fd->message_type(indexes[0]); - - for (auto i : indexes) - { - if (i > static_cast(descriptor->nested_type_count())) - throw Exception(ErrorCodes::INVALID_DATA, "Invalid message index={} max_index={}", i, fd->message_type_count()); - descriptor = descriptor->nested_type(i); - } - return descriptor; -} - SchemaValidationErrors ProtobufSchemas::validateSchema(std::string_view schema) { google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; diff --git a/src/Formats/ProtobufSchemas.h b/src/Formats/ProtobufSchemas.h index b17cce93550..3869a3e8575 100644 --- a/src/Formats/ProtobufSchemas.h +++ b/src/Formats/ProtobufSchemas.h @@ -3,6 +3,8 @@ #include "config.h" #if USE_PROTOBUF +#include + #include #include #include @@ -43,25 +45,12 @@ class ProtobufSchemas : private boost::noncopyable const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info); /// proton: starts - class SchemaRegistry; - - /// Fetches the schema from the (Kafka) schema registry and returns the descriptor of the first message type. - const google::protobuf::Descriptor * getMessageTypeForSchemaRegistry(const String & base_url, UInt32 schema_id); - /// Fetches the schema from the (Kafka) schema registry and returns the descriptor of the message type indicated by the indexes. - const google::protobuf::Descriptor * getMessageTypeForSchemaRegistry(const String & base_url, UInt32 schema_id, const std::vector & indexes); - SchemaValidationErrors validateSchema(std::string_view schema); /// proton: ends private: class ImporterWithSourceTree; std::unordered_map> importers; std::mutex mutex; - - /// proton: starts - google::protobuf::DescriptorPool registry_pool_; - - google::protobuf::DescriptorPool* registry_pool() { return ®istry_pool_; } - /// proton: ends }; } diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index f3dfb24fd95..a2470d91627 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -644,14 +644,96 @@ bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &ext return false; } +/// proton: starts +/// class AvroConfluentRowInputFormat::SchemaRegistry +/// { +/// public: +/// explicit SchemaRegistry(const std::string & base_url_, size_t schema_cache_max_size = 1000) +/// : base_url(base_url_), schema_cache(schema_cache_max_size) +/// { +/// if (base_url.empty()) +/// throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS); +/// } +/// +/// avro::ValidSchema getSchema(uint32_t id) +/// { +/// auto [schema, loaded] = schema_cache.getOrSet( +/// id, +/// [this, id](){ return std::make_shared(fetchSchema(id)); } +/// ); +/// return *schema; +/// } +/// +/// private: +/// avro::ValidSchema fetchSchema(uint32_t id) +/// { +/// try +/// { +/// try +/// { +/// Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); +/// LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Fetching schema id = {}", id); +/// +/// /// One second for connect/send/receive. Just in case. +/// ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); +/// +/// Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); +/// request.setHost(url.getHost()); +/// +/// auto session = makePooledHTTPSession(url, timeouts, 1); +/// std::istream * response_body{}; +/// try +/// { +/// session->sendRequest(request); +/// +/// Poco::Net::HTTPResponse response; +/// response_body = receiveResponse(*session, request, response, false); +/// } +/// catch (const Poco::Exception & e) +/// { +/// /// We use session data storage as storage for exception text +/// /// Depend on it we can deduce to reconnect session or reresolve session host +/// session->attachSessionData(e.message()); +/// throw; +/// } +/// Poco::JSON::Parser parser; +/// auto json_body = parser.parse(*response_body).extract(); +/// auto schema = json_body->getValue("schema"); +/// LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Successfully fetched schema id = {}\n{}", id, schema); +/// +/// return avro::compileJsonSchemaFromString(schema); +/// } +/// catch (const Exception &) +/// { +/// throw; +/// } +/// catch (const Poco::Exception & e) +/// { +/// throw Exception(Exception::CreateFromPocoTag{}, e); +/// } +/// catch (const avro::Exception & e) +/// { +/// throw Exception(e.what(), ErrorCodes::INCORRECT_DATA); +/// } +/// } +/// catch (Exception & e) +/// { +/// e.addMessage("while fetching schema id = " + std::to_string(id)); +/// throw; +/// } +/// } +/// +/// Poco::URI base_url; +/// LRUCache schema_cache; +/// }; + class AvroConfluentRowInputFormat::SchemaRegistry { public: - explicit SchemaRegistry(const std::string & base_url_, size_t schema_cache_max_size = 1000) - : base_url(base_url_), schema_cache(schema_cache_max_size) + explicit SchemaRegistry(const std::string & base_url, const std::string & credentials, size_t schema_cache_max_size = 1000) + : registry(base_url, credentials) + , schema_cache(schema_cache_max_size) { - if (base_url.empty()) - throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS); } avro::ValidSchema getSchema(uint32_t id) @@ -666,70 +748,23 @@ class AvroConfluentRowInputFormat::SchemaRegistry private: avro::ValidSchema fetchSchema(uint32_t id) { - /// proton: starts - /// try - /// { - try - { - /// Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); - /// LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Fetching schema id = {}", id); - /// - /// /// One second for connect/send/receive. Just in case. - /// ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); - /// - /// Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); - /// request.setHost(url.getHost()); - /// - /// auto session = makePooledHTTPSession(url, timeouts, 1); - /// std::istream * response_body{}; - /// try - /// { - /// session->sendRequest(request); - /// - /// Poco::Net::HTTPResponse response; - /// response_body = receiveResponse(*session, request, response, false); - /// } - /// catch (const Poco::Exception & e) - /// { - /// /// We use session data storage as storage for exception text - /// /// Depend on it we can deduce to reconnect session or reresolve session host - /// session->attachSessionData(e.message()); - /// throw; - /// } - /// Poco::JSON::Parser parser; - /// auto json_body = parser.parse(*response_body).extract(); - /// auto schema = json_body->getValue("schema"); - /// LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Successfully fetched schema id = {}\n{}", id, schema); - - auto schema = KafkaSchemaRegistry::instance().fetchSchema(base_url, id); - return avro::compileJsonSchemaFromString(schema); - } - /// catch (const Exception &) - /// { - /// throw; - /// } - /// catch (const Poco::Exception & e) - /// { - /// throw Exception(Exception::CreateFromPocoTag{}, e); - /// } - catch (const avro::Exception & e) - { - auto ex = Exception(e.what(), ErrorCodes::INCORRECT_DATA); - ex.addMessage("while fetching schema id = " + std::to_string(id)); - throw std::move(ex); - } - /// } - /// catch (Exception & e) - /// { - /// e.addMessage("while fetching schema id = " + std::to_string(id)); - /// throw; - /// } - /// proton: ends + auto schema = registry.fetchSchema(id, "AVRO"); + try + { + return avro::compileJsonSchemaFromString(schema); + } + catch (const avro::Exception & e) + { + auto ex = Exception(e.what(), ErrorCodes::INCORRECT_DATA); + ex.addMessage("while fetching schema id = " + std::to_string(id)); + throw std::move(ex); + } } - Poco::URI base_url; + KafkaSchemaRegistry registry; LRUCache schema_cache; }; +/// proton: ends using ConfluentSchemaRegistry = AvroConfluentRowInputFormat::SchemaRegistry; #define SCHEMA_REGISTRY_CACHE_MAX_SIZE 1000 @@ -738,14 +773,17 @@ static LRUCache schema_registry_cache(SCH static std::shared_ptr getConfluentSchemaRegistry(const FormatSettings & format_settings) { - const auto & base_url = format_settings.avro.schema_registry_url; + /// proton: starts + const auto & base_url = format_settings.schema.kafka_schema_registry_url.empty() ? format_settings.avro.schema_registry_url : format_settings.schema.kafka_schema_registry_url; + const auto & credentials = format_settings.schema.kafka_schema_registry_credentials; auto [schema_registry, loaded] = schema_registry_cache.getOrSet( - base_url, - [base_url]() + base_url + credentials, + [base_url, credentials]() { - return std::make_shared(base_url); + return std::make_shared(base_url, credentials); } ); + /// proton: ends return schema_registry; } @@ -804,7 +842,7 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten { return false; } - SchemaId schema_id = KafkaSchemaRegistry::instance().readSchemaId(*in); + SchemaId schema_id = KafkaSchemaRegistry::readSchemaId(*in); const auto & deserializer = getOrCreateDeserializer(schema_id); deserializer.deserializeRow(columns, *decoder, ext); decoder->drain(); @@ -839,7 +877,7 @@ NamesAndTypesList AvroSchemaReader::readSchema() avro::NodePtr root_node; if (confluent) { - UInt32 schema_id = KafkaSchemaRegistry::instance().readSchemaId(in); + UInt32 schema_id = KafkaSchemaRegistry::readSchemaId(in); root_node = getConfluentSchemaRegistry(format_settings)->getSchema(schema_id).root(); } else diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index d5b65cebe2a..24e4ee79129 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -1,5 +1,4 @@ #include "ProtobufRowInputFormat.h" -#include "Common/LRUCache.h" #if USE_PROTOBUF # include @@ -13,12 +12,21 @@ # include /// proton: starts +# include # include +# include +# include +# include /// proton: ends namespace DB { +namespace ErrorCodes +{ +extern const int INVALID_DATA; +} + ProtobufRowInputFormat::ProtobufRowInputFormat( ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_) : IRowInputFormat(header_, in_, params_, ProcessorID::ProtobufRowInputFormatID) @@ -120,10 +128,86 @@ NamesAndTypesList ProtobufSchemaReader::readSchema() } /// proton: starts +namespace +{ +using ConfluentSchemaRegistry = ProtobufConfluentRowInputFormat::SchemaRegistryWithCache; +#define SCHEMA_REGISTRY_CACHE_MAX_SIZE 1000 +/// Cache of Schema Registry URL -> SchemaRegistry +LRUCache schema_registry_cache(SCHEMA_REGISTRY_CACHE_MAX_SIZE); + +std::shared_ptr getConfluentSchemaRegistry(const String & base_url, const String & credentials) +{ + auto [schema_registry, loaded] = schema_registry_cache.getOrSet( + base_url + credentials, + [base_url, credentials]() + { + return std::make_shared(base_url, credentials); + } + ); + return schema_registry; +} +} + +class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache +{ +public: + SchemaRegistryWithCache(const String & base_url, const String & credentials) + : registry(base_url, credentials) + { + } + + const google::protobuf::Descriptor * getMessageType(uint32_t schema_id, const std::vector & indexes) + { + assert(!indexes.empty()); + + const auto *descriptor = getSchema(schema_id)->message_type(indexes[0]); + + for (auto i : indexes) + { + if (i > static_cast(descriptor->nested_type_count())) + throw Exception(ErrorCodes::INVALID_DATA, "Invalid message index={} max_index={} descriptor={}", i, descriptor->nested_type_count(), descriptor->name()); + descriptor = descriptor->nested_type(i); + } + + return descriptor; + } + +private: + const google::protobuf::FileDescriptor * getSchema(uint32_t id) + { + const auto * ret = registry_pool()->FindFileByName(""); + if (ret) + return ret; + + return fetchSchema(id); + } + + const google::protobuf::FileDescriptor* fetchSchema(uint32_t id) + { + auto schema = registry.fetchSchema(id, "PROTOBUF"); + google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; + google::protobuf::io::Tokenizer tokenizer(&input, /*FIXME*/nullptr); + google::protobuf::FileDescriptorProto descriptor; + google::protobuf::compiler::Parser parser; + + parser.RecordErrorsTo(/*FIXME*/nullptr); + parser.Parse(&tokenizer, &descriptor); + // if (!error_collector.getErrors().empty()) + // throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Invalid protobuf schema FIXME"); + + return registry_pool()->BuildFile(descriptor); + } + + KafkaSchemaRegistry registry; + google::protobuf::DescriptorPool registry_pool_; + + google::protobuf::DescriptorPool* registry_pool() { return ®istry_pool_; } +}; + ProtobufConfluentRowInputFormat::ProtobufConfluentRowInputFormat( ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) : IRowInputFormat(header_, in_, params_, ProcessorID::ProtobufRowInputFormatID) - , registry_url(format_settings_.schema.kafka_schema_registry_url) + , registry(getConfluentSchemaRegistry(format_settings_.schema.kafka_schema_registry_url, format_settings_.schema.kafka_schema_registry_credentials)) , reader(std::make_unique(in_)) { } @@ -144,8 +228,7 @@ bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadE if (reader->eof()) return false; - auto & registry = KafkaSchemaRegistry::instance(); - auto schema_id = registry.readSchemaId(*in); + auto schema_id = KafkaSchemaRegistry::readSchemaId(*in); auto indexes_count = reader->readSInt(); std::vector indexes(1); @@ -163,7 +246,7 @@ bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadE header.getNames(), header.getDataTypes(), missing_column_indices, - *ProtobufSchemas::instance().getMessageTypeForSchemaRegistry(registry_url, schema_id, indexes), + *registry->getMessageType(schema_id, indexes), /*with_length_delimiter=*/false, *reader); diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h index 6910c613359..47947182981 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h @@ -1,5 +1,6 @@ #pragma once +#include "Formats/KafkaSchemaRegistry.h" #include "config.h" #if USE_PROTOBUF @@ -80,15 +81,14 @@ class ProtobufConfluentRowInputFormat final : public IRowInputFormat void setReadBuffer(ReadBuffer & buf) override; + class SchemaRegistryWithCache; + private: bool readRow(MutableColumns & columns, RowReadExtension & row_read_extension) override; - bool allowSyncAfterError() const override { return true; } void syncAfterError() override; - using SchemaId = uint32_t; - - String registry_url; + std::shared_ptr registry; std::unique_ptr reader; std::vector missing_column_indices; std::unique_ptr serializer; From af741881b8c874e99b49b7ef7eecf718581a9ec2 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Fri, 23 Feb 2024 01:49:47 -0800 Subject: [PATCH 04/16] fixed issues --- src/Formats/KafkaSchemaRegistry.cpp | 6 +- src/KafkaLog/KafkaWALPool.cpp | 10 +-- src/KafkaLog/KafkaWALSettings.h | 1 + .../Formats/Impl/ProtobufRowInputFormat.cpp | 85 ++++++++++++++----- .../Formats/Impl/ProtobufRowInputFormat.h | 2 +- 5 files changed, 73 insertions(+), 31 deletions(-) diff --git a/src/Formats/KafkaSchemaRegistry.cpp b/src/Formats/KafkaSchemaRegistry.cpp index fede187cd3d..0e5fa959ae3 100644 --- a/src/Formats/KafkaSchemaRegistry.cpp +++ b/src/Formats/KafkaSchemaRegistry.cpp @@ -25,7 +25,7 @@ KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String else { credentials.setUsername(credentials_.substr(0, pos)); - credentials.setPassword(credentials_.substr(pos)); + credentials.setPassword(credentials_.substr(pos + 1)); } } @@ -70,8 +70,8 @@ String KafkaSchemaRegistry::fetchSchema(UInt32 id, const String & expected_schem if (!expected_schema_type.empty()) { - auto schema_type = json_body->getValue("type"); - if (boost::iequals(schema_type, expected_schema_type)) + auto schema_type = json_body->getValue("schemaType"); + if (!boost::iequals(schema_type, expected_schema_type)) throw Exception(ErrorCodes::TYPE_MISMATCH, "Expected schema type {}, got {}", expected_schema_type, schema_type); } auto schema = json_body->getValue("schema"); diff --git a/src/KafkaLog/KafkaWALPool.cpp b/src/KafkaLog/KafkaWALPool.cpp index 872bc010801..b8e22363ef2 100644 --- a/src/KafkaLog/KafkaWALPool.cpp +++ b/src/KafkaLog/KafkaWALPool.cpp @@ -439,13 +439,13 @@ KafkaWALSimpleConsumerPtr KafkaWALPool::getOrCreateStreamingExternal(const Strin /// Create one auto ksettings = std::make_unique(); - ksettings->fetch_wait_max_ms = fetch_wait_max_ms; + ksettings->fetch_wait_max_ms = fetch_wait_max_ms; - ksettings->brokers = brokers; + ksettings->brokers = brokers; - /// Streaming WALs have a different group ID - ksettings->group_id += "-tp-external-streaming-query-" + std::to_string(consumers.second.size() + 1); - ksettings->auth = auth; + /// Streaming WALs have a different group ID + ksettings->group_id += "-tp-external-streaming-query-" + std::to_string(consumers.second.size() + 1); + ksettings->auth = auth; /// We don't care offset checkpointing for WALs used for streaming processing, /// No auto commit diff --git a/src/KafkaLog/KafkaWALSettings.h b/src/KafkaLog/KafkaWALSettings.h index 484e59a8ecd..ef81ed35076 100644 --- a/src/KafkaLog/KafkaWALSettings.h +++ b/src/KafkaLog/KafkaWALSettings.h @@ -146,6 +146,7 @@ struct KafkaWALSettings settings.push_back(fmt::format("shared_subscription_flush_threshold_bytes={}", shared_subscription_flush_threshold_bytes)); settings.push_back(fmt::format("shared_subscription_flush_threshold_ms={}", shared_subscription_flush_threshold_ms)); settings.push_back(fmt::format("auth.security.protocol={}", auth.security_protocol)); + settings.push_back(fmt::format("auth.sasl.mechanism={}", auth.sasl_mechanism)); settings.push_back(fmt::format("auth.username={}", auth.username)); settings.push_back(fmt::format("auth.password={}", auth.password)); settings.push_back(fmt::format("auth.ssl.ca.location={}", auth.ssl_ca_cert_file)); diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 24e4ee79129..0bddead9a0c 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -14,6 +14,7 @@ /// proton: starts # include # include +# include # include # include # include @@ -132,7 +133,7 @@ namespace { using ConfluentSchemaRegistry = ProtobufConfluentRowInputFormat::SchemaRegistryWithCache; #define SCHEMA_REGISTRY_CACHE_MAX_SIZE 1000 -/// Cache of Schema Registry URL -> SchemaRegistry +/// Cache of Schema Registry URL + credentials -> SchemaRegistry LRUCache schema_registry_cache(SCHEMA_REGISTRY_CACHE_MAX_SIZE); std::shared_ptr getConfluentSchemaRegistry(const String & base_url, const String & credentials) @@ -148,6 +149,25 @@ std::shared_ptr getConfluentSchemaRegistry(const String } } +namespace +{ +class ErrorThrower final: public google::protobuf::io::ErrorCollector +{ +public: + explicit ErrorThrower(UInt32 schema_id_): schema_id(schema_id_) + { + } + + void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override + { + throw Exception(ErrorCodes::INVALID_DATA, "Failed to parse schema {}: line={}, column={}, message={}", schema_id, line, column, message); + } + +private: + UInt32 schema_id; +}; +} + class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache { public: @@ -160,9 +180,14 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache { assert(!indexes.empty()); - const auto *descriptor = getSchema(schema_id)->message_type(indexes[0]); + const auto *fd = getSchema(schema_id); + auto mt_count = fd->message_type_count(); + if (mt_count < indexes[0] + 1) + throw Exception(ErrorCodes::INVALID_DATA, "Invalid message index={} max_index={}", indexes[0], mt_count); - for (auto i : indexes) + const auto *descriptor = fd->message_type(indexes[0]); + + for (auto i : std::vector(indexes.begin() + 1, indexes.end())) { if (i > static_cast(descriptor->nested_type_count())) throw Exception(ErrorCodes::INVALID_DATA, "Invalid message index={} max_index={} descriptor={}", i, descriptor->nested_type_count(), descriptor->name()); @@ -175,9 +200,9 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache private: const google::protobuf::FileDescriptor * getSchema(uint32_t id) { - const auto * ret = registry_pool()->FindFileByName(""); - if (ret) - return ret; + const auto * loaded_descriptor = registry_pool()->FindFileByName(std::to_string(id)); + if (loaded_descriptor) + return loaded_descriptor; return fetchSchema(id); } @@ -185,17 +210,21 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache const google::protobuf::FileDescriptor* fetchSchema(uint32_t id) { auto schema = registry.fetchSchema(id, "PROTOBUF"); + ErrorThrower err_thrower {id}; + std::string schema_content {schema.data(), schema.size()}; google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; - google::protobuf::io::Tokenizer tokenizer(&input, /*FIXME*/nullptr); + google::protobuf::io::Tokenizer tokenizer(&input, &err_thrower); google::protobuf::FileDescriptorProto descriptor; + descriptor.set_name(std::to_string(id)); google::protobuf::compiler::Parser parser; - - parser.RecordErrorsTo(/*FIXME*/nullptr); + parser.RecordErrorsTo(&err_thrower); parser.Parse(&tokenizer, &descriptor); - // if (!error_collector.getErrors().empty()) - // throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Invalid protobuf schema FIXME"); - return registry_pool()->BuildFile(descriptor); + auto const * ret = registry_pool()->BuildFile(descriptor); + auto mt_count = ret->message_type_count(); + if (mt_count < 1) + throw Exception(ErrorCodes::INVALID_DATA, "No message type in schema"); + return ret; } KafkaSchemaRegistry registry; @@ -212,11 +241,11 @@ ProtobufConfluentRowInputFormat::ProtobufConfluentRowInputFormat( { } -void ProtobufConfluentRowInputFormat::setReadBuffer(ReadBuffer & buf) -{ - IInputFormat::setReadBuffer(buf); - reader->setReadBuffer(buf); -} +// void ProtobufConfluentRowInputFormat::setReadBuffer(ReadBuffer & buf) +// { +// IInputFormat::setReadBuffer(buf); +// reader->setReadBuffer(buf); +// } void ProtobufConfluentRowInputFormat::syncAfterError() { @@ -225,30 +254,42 @@ void ProtobufConfluentRowInputFormat::syncAfterError() bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & row_read_extension) { - if (reader->eof()) + if (in->eof()) return false; auto schema_id = KafkaSchemaRegistry::readSchemaId(*in); - auto indexes_count = reader->readSInt(); - std::vector indexes(1); + + Int64 indexes_count = 0; + readVarInt(indexes_count, *in); + + std::vector indexes; + indexes.reserve(indexes_count < 1 ? 1 : indexes_count); + if (indexes_count < 1) indexes.push_back(0); else { for (size_t i = 0; i < static_cast(indexes_count); i++) - indexes.push_back(reader->readSInt()); + { + Int64 ind = 0; + readVarInt(ind, *in); + indexes.push_back(ind); + } } const auto & header = getPort().getHeader(); + // reader->setReadBuffer(*in); + + ProtobufReader reader_ {*in}; serializer = ProtobufSerializer::create( header.getNames(), header.getDataTypes(), missing_column_indices, *registry->getMessageType(schema_id, indexes), /*with_length_delimiter=*/false, - *reader); + reader_); size_t row_num = columns.empty() ? 0 : columns[0]->size(); if (!row_num) diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h index 47947182981..b58f4f76df9 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h @@ -79,7 +79,7 @@ class ProtobufConfluentRowInputFormat final : public IRowInputFormat ProtobufConfluentRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_); String getName() const override { return "ProtobufConfluentRowInputFormat"; } - void setReadBuffer(ReadBuffer & buf) override; + // void setReadBuffer(ReadBuffer & buf) override; class SchemaRegistryWithCache; From 5b135086111dca1894a4ce370fc4a29d157ff83e Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Fri, 23 Feb 2024 02:03:32 -0800 Subject: [PATCH 05/16] small cleanup --- .../Formats/Impl/ProtobufRowInputFormat.cpp | 18 ++---------------- .../Formats/Impl/ProtobufRowInputFormat.h | 3 --- 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 0bddead9a0c..878fc8636dd 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -237,21 +237,9 @@ ProtobufConfluentRowInputFormat::ProtobufConfluentRowInputFormat( ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) : IRowInputFormat(header_, in_, params_, ProcessorID::ProtobufRowInputFormatID) , registry(getConfluentSchemaRegistry(format_settings_.schema.kafka_schema_registry_url, format_settings_.schema.kafka_schema_registry_credentials)) - , reader(std::make_unique(in_)) { } -// void ProtobufConfluentRowInputFormat::setReadBuffer(ReadBuffer & buf) -// { -// IInputFormat::setReadBuffer(buf); -// reader->setReadBuffer(buf); -// } - -void ProtobufConfluentRowInputFormat::syncAfterError() -{ - reader->endMessage(true); -} - bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & row_read_extension) { if (in->eof()) @@ -280,16 +268,14 @@ bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadE const auto & header = getPort().getHeader(); - // reader->setReadBuffer(*in); - - ProtobufReader reader_ {*in}; + ProtobufReader reader {*in}; serializer = ProtobufSerializer::create( header.getNames(), header.getDataTypes(), missing_column_indices, *registry->getMessageType(schema_id, indexes), /*with_length_delimiter=*/false, - reader_); + reader); size_t row_num = columns.empty() ? 0 : columns[0]->size(); if (!row_num) diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h index b58f4f76df9..e2276f922c7 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h @@ -85,11 +85,8 @@ class ProtobufConfluentRowInputFormat final : public IRowInputFormat private: bool readRow(MutableColumns & columns, RowReadExtension & row_read_extension) override; - bool allowSyncAfterError() const override { return true; } - void syncAfterError() override; std::shared_ptr registry; - std::unique_ptr reader; std::vector missing_column_indices; std::unique_ptr serializer; }; From ba0661a1a843b6e9c796a101f94c0a38f34964b6 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Sat, 24 Feb 2024 01:10:03 -0800 Subject: [PATCH 06/16] fixed and enabled Avro --- docker/packager/packager | 1 + src/Formats/KafkaSchemaRegistry.cpp | 10 +--------- src/Formats/KafkaSchemaRegistry.h | 2 +- src/Processors/Formats/Impl/AvroRowInputFormat.cpp | 12 +++++++++--- src/Processors/Formats/Impl/AvroRowInputFormat.h | 4 ++-- .../Formats/Impl/ProtobufRowInputFormat.cpp | 2 +- 6 files changed, 15 insertions(+), 16 deletions(-) diff --git a/docker/packager/packager b/docker/packager/packager index 67bc8ed08e8..75612f65c38 100755 --- a/docker/packager/packager +++ b/docker/packager/packager @@ -219,6 +219,7 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, image_typ cmake_flags.append('-DENABLE_KRB5=ON') cmake_flags.append('-DENABLE_BROTLI=ON') cmake_flags.append('-DENABLE_S3=ON') + cmake_flags.append('-DENABLE_AVRO=ON') # krb5: Disabled in environments other than Linux and native Darwin. # Reference: contrib/krb5-cmake/CMakeLists.txt:3 diff --git a/src/Formats/KafkaSchemaRegistry.cpp b/src/Formats/KafkaSchemaRegistry.cpp index 0e5fa959ae3..637dcb1159c 100644 --- a/src/Formats/KafkaSchemaRegistry.cpp +++ b/src/Formats/KafkaSchemaRegistry.cpp @@ -11,7 +11,6 @@ namespace DB namespace ErrorCodes { extern const int INCORRECT_DATA; -extern const int TYPE_MISMATCH; } KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String & credentials_): base_url(base_url_) @@ -29,7 +28,7 @@ KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String } } -String KafkaSchemaRegistry::fetchSchema(UInt32 id, const String & expected_schema_type) +String KafkaSchemaRegistry::fetchSchema(UInt32 id) { assert(!base_url.empty()); @@ -67,13 +66,6 @@ String KafkaSchemaRegistry::fetchSchema(UInt32 id, const String & expected_schem } Poco::JSON::Parser parser; auto json_body = parser.parse(*response_body).extract(); - - if (!expected_schema_type.empty()) - { - auto schema_type = json_body->getValue("schemaType"); - if (!boost::iequals(schema_type, expected_schema_type)) - throw Exception(ErrorCodes::TYPE_MISMATCH, "Expected schema type {}, got {}", expected_schema_type, schema_type); - } auto schema = json_body->getValue("schema"); LOG_TRACE((&Poco::Logger::get("KafkaSchemaRegistry")), "Successfully fetched schema id = {}\n{}", id, schema); return schema; diff --git a/src/Formats/KafkaSchemaRegistry.h b/src/Formats/KafkaSchemaRegistry.h index 0e7fce939b0..8f2b1d3154d 100644 --- a/src/Formats/KafkaSchemaRegistry.h +++ b/src/Formats/KafkaSchemaRegistry.h @@ -16,7 +16,7 @@ class KafkaSchemaRegistry final /// `credentials_` is expected to be formatted in ":". KafkaSchemaRegistry(const String & base_url_, const String & credentials_); - String fetchSchema(UInt32 id, const String & expected_schema_type = ""); + String fetchSchema(UInt32 id); private: Poco::URI base_url; diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index a2470d91627..e368a1e690c 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -748,7 +748,7 @@ class AvroConfluentRowInputFormat::SchemaRegistry private: avro::ValidSchema fetchSchema(uint32_t id) { - auto schema = registry.fetchSchema(id, "AVRO"); + auto schema = registry.fetchSchema(id); try { return avro::compileJsonSchemaFromString(schema); @@ -823,12 +823,12 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat( const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) : IRowInputFormat(header_, in_, params_, ProcessorID::AvroConfluentRowInputFormatID) , schema_registry(getConfluentSchemaRegistry(format_settings_)) - , input_stream(std::make_unique(*in)) + /// , input_stream(std::make_unique(*in)) /* proton: updates */ , decoder(avro::binaryDecoder()) , format_settings(format_settings_) { - decoder->init(*input_stream); + /// decoder->init(*input_stream); /* proton: updates */ } bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) @@ -842,8 +842,14 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten { return false; } + + /// proton: starts SchemaId schema_id = KafkaSchemaRegistry::readSchemaId(*in); const auto & deserializer = getOrCreateDeserializer(schema_id); + InputStreamReadBufferAdapter is {*in}; + decoder->init(is); + /// proton: ends + deserializer.deserializeRow(columns, *decoder, ext); decoder->drain(); return true; diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.h b/src/Processors/Formats/Impl/AvroRowInputFormat.h index 2ec3fdace8f..8b98255bae8 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -145,7 +145,7 @@ class AvroConfluentRowInputFormat final : public IRowInputFormat class SchemaRegistry; private: - virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; /* proton: updates */ bool allowSyncAfterError() const override { return true; } void syncAfterError() override; @@ -155,7 +155,7 @@ class AvroConfluentRowInputFormat final : public IRowInputFormat std::unordered_map deserializer_cache; const AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id); - avro::InputStreamPtr input_stream; + /// avro::InputStreamPtr input_stream; /* proton: updates */ avro::DecoderPtr decoder; FormatSettings format_settings; }; diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 878fc8636dd..10286dbb84d 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -209,7 +209,7 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache const google::protobuf::FileDescriptor* fetchSchema(uint32_t id) { - auto schema = registry.fetchSchema(id, "PROTOBUF"); + auto schema = registry.fetchSchema(id); ErrorThrower err_thrower {id}; std::string schema_content {schema.data(), schema.size()}; google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; From 20ed03037eb4aa3ce2cf6de86d33d8ea13d186bd Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Sat, 24 Feb 2024 01:25:29 -0800 Subject: [PATCH 07/16] allow Avro data format to be used with schemar registry --- build.sh | 5 +++-- src/Processors/Formats/Impl/AvroRowInputFormat.cpp | 10 ++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/build.sh b/build.sh index 4b50d027cce..1680edf7eb4 100755 --- a/build.sh +++ b/build.sh @@ -15,7 +15,7 @@ if [ -z "$sanitizer" ]; then fi cmake .. \ - -DCMAKE_BUILD_TYPE=${build_type} \ + -DCMAKE_BUILD_TYPE="$build_type" \ -DENABLE_PROTON_ALL=OFF \ -DENABLE_PROTON_SERVER=ON \ -DENABLE_PROTON_CLIENT=ON \ @@ -73,5 +73,6 @@ cmake .. \ -DENABLE_SNOWFLAKE_FUNCS=ON \ -DENABLE_ENCRYPT_DECRYPT_FUNCS=ON \ -DENABLE_DEBUG_FUNCS=ON \ - -DENABLE_URL_FUNCS=ON + -DENABLE_URL_FUNCS=ON \ + -DENABLE_AVRO=ON diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index e368a1e690c..5483bac10e1 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -965,9 +965,15 @@ void registerInputFormatAvro(FormatFactory & factory) ReadBuffer & buf, const Block & sample, const RowInputFormatParams & params, - const FormatSettings & settings) + const FormatSettings & settings) -> InputFormatPtr { - return std::make_shared(sample, buf, params, settings); + /// proton: starts + /// Use only one format name "Avro" to support both shema registry and non-schema registry use cases, rather than using another name "AvroConfluent" + if (settings.avro.schema_registry_url.empty() && settings.schema.kafka_schema_registry_url.empty()) + return std::make_shared(sample, buf, params, settings); + + return std::make_shared(sample, buf, params, settings); + /// proton: ends }); factory.registerInputFormat("AvroConfluent",[]( From 1ed7a659e7bc4a36e044a331fece40a546f22d69 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Sat, 24 Feb 2024 01:43:14 -0800 Subject: [PATCH 08/16] do not allow format_schema and schema_registry_url used at the same time --- src/Processors/Formats/Impl/AvroRowInputFormat.cpp | 7 +++++++ src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 5483bac10e1..f4a7c905cad 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -81,6 +81,10 @@ namespace ErrorCodes extern const int TYPE_MISMATCH; extern const int CANNOT_PARSE_UUID; extern const int CANNOT_READ_ALL_DATA; + + /// proton: starts + extern const int INVALID_SETTING_VALUE; + /// proton: ends } class InputStreamReadBufferAdapter : public avro::InputStream @@ -969,9 +973,12 @@ void registerInputFormatAvro(FormatFactory & factory) { /// proton: starts /// Use only one format name "Avro" to support both shema registry and non-schema registry use cases, rather than using another name "AvroConfluent" + if (settings.avro.schema_registry_url.empty() && settings.schema.kafka_schema_registry_url.empty()) return std::make_shared(sample, buf, params, settings); + if (!settings.schema.format_schema.empty()) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "schema_registry_url and format_schema cannot be used at the same time"); return std::make_shared(sample, buf, params, settings); /// proton: ends }); diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 10286dbb84d..0456541bf39 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -23,10 +23,13 @@ namespace DB { +/// proton: starts namespace ErrorCodes { extern const int INVALID_DATA; +extern const int INVALID_SETTING_VALUE; } +/// proton: ends ProtobufRowInputFormat::ProtobufRowInputFormat( ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_) @@ -101,6 +104,9 @@ void registerInputFormatProtobuf(FormatFactory & factory){ return std::make_shared( buf, sample, std::move(params), FormatSchemaInfo(settings, "Protobuf", true), /*with_length_delimiter=*/false); + if (!settings.schema.format_schema.empty()) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "kafka_schema_registry_url and format_schema cannot be used at the same time"); + return std::make_shared( buf, sample, std::move(params), settings); }); From b64f437d3a4a3c587a632a455462172720d83f60 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Mon, 26 Feb 2024 03:04:43 -0800 Subject: [PATCH 09/16] small refactors --- src/Formats/FormatSchemaFactory.cpp | 31 +++++++------- src/Formats/ProtobufSchemas.cpp | 42 ++++++------------- src/Formats/ProtobufSchemas.h | 15 +++---- src/Processors/Formats/ISchemaWriter.h | 18 +------- .../Formats/Impl/ProtobufRowInputFormat.cpp | 32 ++++---------- .../Formats/Impl/ProtobufRowInputFormat.h | 6 ++- 6 files changed, 51 insertions(+), 93 deletions(-) diff --git a/src/Formats/FormatSchemaFactory.cpp b/src/Formats/FormatSchemaFactory.cpp index dca1ee85a7b..5aaf23a5853 100644 --- a/src/Formats/FormatSchemaFactory.cpp +++ b/src/Formats/FormatSchemaFactory.cpp @@ -4,6 +4,10 @@ #include #include +/// proton: starts +#include +/// proton: ends + namespace DB { @@ -18,24 +22,12 @@ extern const int INVALID_DATA; namespace { + String basename(const std::filesystem::path & path) { return path.filename().replace_extension().string(); } -String formatSchemaValidationErrors(SchemaValidationErrors errors) -{ - assert(!errors.empty()); - String ret; - for (size_t i = 0; i < errors.size(); ++i) - { - if (i > 0) - ret.append("; "); - ret.append(fmt::format("line: {}, columns: {}, error: {}", errors[i].line, errors[i].col, errors[i].error)); - } - - return ret; -} } void FormatSchemaFactory::registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, ContextPtr & context) @@ -53,8 +45,17 @@ void FormatSchemaFactory::registerSchema(const String & schema_name, const Strin std::lock_guard lock(mutex); auto writer = FormatFactory::instance().getExternalSchemaWriter(format, schema_body, context, format_settings); assert(writer); /* confirmed with checkSchemaType */ - if (auto errors = writer->validate(); !errors.empty()) - throw Exception(ErrorCodes::INVALID_DATA, "Invalid Protobuf schema, errors: {}", formatSchemaValidationErrors(errors)); + + try + { + writer->validate(); + } + catch (DB::Exception & e) + { + e.addMessage(std::format("{} schema {} was invalid", format, schema_name)); + e.rethrow(); + } + auto result = writer->write(exists_op == ExistsOP::Replace); if (!result && exists_op == ExistsOP::Throw) diff --git a/src/Formats/ProtobufSchemas.cpp b/src/Formats/ProtobufSchemas.cpp index e039442db2a..76fab9294a8 100644 --- a/src/Formats/ProtobufSchemas.cpp +++ b/src/Formats/ProtobufSchemas.cpp @@ -16,33 +16,8 @@ namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int CANNOT_PARSE_PROTOBUF_SCHEMA; - /// proton: starts - extern const int INVALID_DATA; - /// proton: ends } -/// proton: starts -namespace -{ -class ErrorCollector final: public google::protobuf::io::ErrorCollector -{ -private: - SchemaValidationErrors errors; - -public: - void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override - { - errors.emplace_back(line, column, message); - } - - const SchemaValidationErrors & getErrors() const - { - return errors; - } -}; -} -/// proton: starts - ProtobufSchemas & ProtobufSchemas::instance() { static ProtobufSchemas instance; @@ -108,17 +83,24 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSch } /// proton: starts -SchemaValidationErrors ProtobufSchemas::validateSchema(std::string_view schema) +/// Overrides google::protobuf::io::ErrorCollector: +void ProtobufSchemas::AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) +{ + throw Exception( + "Cannot parse schema, found an error at line " + std::to_string(line) + ", column " + std::to_string(column) + + ", " + message, + ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA); +} + +void ProtobufSchemas::validateSchema(std::string_view schema) { google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; - ErrorCollector error_collector; - google::protobuf::io::Tokenizer tokenizer(&input, &error_collector); + google::protobuf::io::Tokenizer tokenizer(&input, this); google::protobuf::FileDescriptorProto descriptor; google::protobuf::compiler::Parser parser; - parser.RecordErrorsTo(&error_collector); + parser.RecordErrorsTo(this); parser.Parse(&tokenizer, &descriptor); - return error_collector.getErrors(); } /// proton: ends diff --git a/src/Formats/ProtobufSchemas.h b/src/Formats/ProtobufSchemas.h index 3869a3e8575..669d813cccc 100644 --- a/src/Formats/ProtobufSchemas.h +++ b/src/Formats/ProtobufSchemas.h @@ -11,6 +11,7 @@ #include #include #include +#include namespace google @@ -24,28 +25,28 @@ namespace protobuf namespace DB { class FormatSchemaInfo; -/// proton: starts -struct SchemaValidationError; -using SchemaValidationErrors = std::vector; -/// proton: ends /** Keeps parsed google protobuf schemas parsed from files. * This class is used to handle the "Protobuf" input/output formats. */ -class ProtobufSchemas : private boost::noncopyable +class ProtobufSchemas : private boost::noncopyable, public google::protobuf::io::ErrorCollector /* proton: updated */ { public: static ProtobufSchemas & instance(); ProtobufSchemas(); - ~ProtobufSchemas(); + ~ProtobufSchemas() override; /* proton: updated */ /// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type. /// The function never returns nullptr, it throws an exception if it cannot load or parse the file. const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info); /// proton: starts - SchemaValidationErrors validateSchema(std::string_view schema); + void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override; + + /// Validates the given schema and throw a DB::Exception if the schema is invalid. + /// The exception will contain the first error encountered when validating the schema, i.e. there could be more errors. + void validateSchema(std::string_view schema); /// proton: ends private: class ImporterWithSourceTree; diff --git a/src/Processors/Formats/ISchemaWriter.h b/src/Processors/Formats/ISchemaWriter.h index 93f02a744b6..760b7aecba7 100644 --- a/src/Processors/Formats/ISchemaWriter.h +++ b/src/Processors/Formats/ISchemaWriter.h @@ -6,20 +6,6 @@ namespace DB { -struct SchemaValidationError final -{ - /// On which line the error happens. If line < 0, it indicates the error is not related to specific line, - /// or the validator is unable to provide such information. - int line; - /// At which column the error happens. If col < 0, it indicates the error is not related to specific column, - /// or the validator is unable to provide such information. - int col; - /// Error message - String error; -}; - -using SchemaValidationErrors = std::vector; - class IExternalSchemaWriter { public: @@ -30,8 +16,8 @@ class IExternalSchemaWriter virtual ~IExternalSchemaWriter() = default; - /// Validates the schema input. - virtual SchemaValidationErrors validate() = 0; + /// Validates the schema input. Should throws exceptions on validation failures. + virtual void validate() = 0; /// Persistents the schema. /// If the schema already exists, and replace_if_exist is false, it returns false. diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 0456541bf39..620574bb92a 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -155,31 +155,18 @@ std::shared_ptr getConfluentSchemaRegistry(const String } } -namespace -{ -class ErrorThrower final: public google::protobuf::io::ErrorCollector +class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache: public google::protobuf::io::ErrorCollector { public: - explicit ErrorThrower(UInt32 schema_id_): schema_id(schema_id_) + SchemaRegistryWithCache(const String & base_url, const String & credentials) + : registry(base_url, credentials) { } + /// Overrides google::protobuf::io::ErrorCollector. void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override { - throw Exception(ErrorCodes::INVALID_DATA, "Failed to parse schema {}: line={}, column={}, message={}", schema_id, line, column, message); - } - -private: - UInt32 schema_id; -}; -} - -class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache -{ -public: - SchemaRegistryWithCache(const String & base_url, const String & credentials) - : registry(base_url, credentials) - { + throw Exception(ErrorCodes::INVALID_DATA, "Failed to parse schema, line={}, column={}, message={}", line, column, message); } const google::protobuf::Descriptor * getMessageType(uint32_t schema_id, const std::vector & indexes) @@ -216,14 +203,13 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache const google::protobuf::FileDescriptor* fetchSchema(uint32_t id) { auto schema = registry.fetchSchema(id); - ErrorThrower err_thrower {id}; std::string schema_content {schema.data(), schema.size()}; google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; - google::protobuf::io::Tokenizer tokenizer(&input, &err_thrower); + google::protobuf::io::Tokenizer tokenizer(&input, this); google::protobuf::FileDescriptorProto descriptor; descriptor.set_name(std::to_string(id)); google::protobuf::compiler::Parser parser; - parser.RecordErrorsTo(&err_thrower); + parser.RecordErrorsTo(this); parser.Parse(&tokenizer, &descriptor); auto const * ret = registry_pool()->BuildFile(descriptor); @@ -302,9 +288,9 @@ ProtobufSchemaWriter::ProtobufSchemaWriter(std::string_view schema_body_, const { } -SchemaValidationErrors ProtobufSchemaWriter::validate() +void ProtobufSchemaWriter::validate() { - return ProtobufSchemas::instance().validateSchema(schema_body); + ProtobufSchemas::instance().validateSchema(schema_body); } bool ProtobufSchemaWriter::write(bool replace_if_exist) diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h index e2276f922c7..23a4f32a57a 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h @@ -1,6 +1,5 @@ #pragma once -#include "Formats/KafkaSchemaRegistry.h" #include "config.h" #if USE_PROTOBUF @@ -8,6 +7,9 @@ # include # include # include +/// proton: starts +# include +/// proton: ends namespace DB { @@ -96,7 +98,7 @@ class ProtobufSchemaWriter : public IExternalSchemaWriter public: explicit ProtobufSchemaWriter(std::string_view schema_body_, const FormatSettings & settings_); - SchemaValidationErrors validate() override; + void validate() override; bool write(bool replace_if_exist) override; private: From 3c3994d4cfba7a09aebe7b2901b8c1cbf776de6f Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Mon, 26 Feb 2024 03:10:12 -0800 Subject: [PATCH 10/16] unwanted changes --- src/KafkaLog/KafkaWALPool.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/KafkaLog/KafkaWALPool.cpp b/src/KafkaLog/KafkaWALPool.cpp index b8e22363ef2..872bc010801 100644 --- a/src/KafkaLog/KafkaWALPool.cpp +++ b/src/KafkaLog/KafkaWALPool.cpp @@ -439,13 +439,13 @@ KafkaWALSimpleConsumerPtr KafkaWALPool::getOrCreateStreamingExternal(const Strin /// Create one auto ksettings = std::make_unique(); - ksettings->fetch_wait_max_ms = fetch_wait_max_ms; + ksettings->fetch_wait_max_ms = fetch_wait_max_ms; - ksettings->brokers = brokers; + ksettings->brokers = brokers; - /// Streaming WALs have a different group ID - ksettings->group_id += "-tp-external-streaming-query-" + std::to_string(consumers.second.size() + 1); - ksettings->auth = auth; + /// Streaming WALs have a different group ID + ksettings->group_id += "-tp-external-streaming-query-" + std::to_string(consumers.second.size() + 1); + ksettings->auth = auth; /// We don't care offset checkpointing for WALs used for streaming processing, /// No auto commit From dc46803b9e83869dbea6d32f963b67a095964076 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Mon, 26 Feb 2024 03:16:59 -0800 Subject: [PATCH 11/16] fixed format --- .../Formats/Impl/AvroRowInputFormat.cpp | 6 ++-- .../Formats/Impl/AvroRowInputFormat.h | 4 +-- .../Formats/Impl/ProtobufRowInputFormat.cpp | 36 ++++++++++--------- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index f4a7c905cad..cec23704d0b 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -103,7 +103,7 @@ class InputStreamReadBufferAdapter : public avro::InputStream *data = reinterpret_cast(in.position()); *len = in.available(); - in.position() += in.available(); + in.position() += in.available(); return true; } @@ -827,12 +827,12 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat( const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) : IRowInputFormat(header_, in_, params_, ProcessorID::AvroConfluentRowInputFormatID) , schema_registry(getConfluentSchemaRegistry(format_settings_)) - /// , input_stream(std::make_unique(*in)) /* proton: updates */ + /// , input_stream(std::make_unique(*in)) /* proton: updated */ , decoder(avro::binaryDecoder()) , format_settings(format_settings_) { - /// decoder->init(*input_stream); /* proton: updates */ + /// decoder->init(*input_stream); /* proton: updated */ } bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.h b/src/Processors/Formats/Impl/AvroRowInputFormat.h index 8b98255bae8..5fda734efc1 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -145,7 +145,7 @@ class AvroConfluentRowInputFormat final : public IRowInputFormat class SchemaRegistry; private: - bool readRow(MutableColumns & columns, RowReadExtension & ext) override; /* proton: updates */ + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; /* proton: updated */ bool allowSyncAfterError() const override { return true; } void syncAfterError() override; @@ -155,7 +155,7 @@ class AvroConfluentRowInputFormat final : public IRowInputFormat std::unordered_map deserializer_cache; const AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id); - /// avro::InputStreamPtr input_stream; /* proton: updates */ + /// avro::InputStreamPtr input_stream; /* proton: updated */ avro::DecoderPtr decoder; FormatSettings format_settings; }; diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 620574bb92a..f767ad4f740 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -1,23 +1,23 @@ #include "ProtobufRowInputFormat.h" #if USE_PROTOBUF -# include -# include -# include -# include -# include -# include -# include -# include -# include +# include +# include +# include +# include +# include +# include +# include +# include +# include /// proton: starts -# include -# include -# include -# include -# include -# include +# include +# include +# include +# include +# include +# include /// proton: ends namespace DB @@ -124,7 +124,11 @@ void registerInputFormatProtobuf(FormatFactory & factory){ ProtobufSchemaReader::ProtobufSchemaReader(const FormatSettings & format_settings) : schema_info( - format_settings.schema.format_schema, "Protobuf", true, format_settings.schema.is_server, format_settings.schema.format_schema_path) + format_settings.schema.format_schema, + "Protobuf", + true, + format_settings.schema.is_server, + format_settings.schema.format_schema_path) { } From 866a3ea585ba30ba55ea6b0e95561e05127ae9f0 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Mon, 26 Feb 2024 10:21:33 -0800 Subject: [PATCH 12/16] fixed smoke tests --- tests/stream/test_stream_smoke/0033_format_schema.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/stream/test_stream_smoke/0033_format_schema.yaml b/tests/stream/test_stream_smoke/0033_format_schema.yaml index 83ae747529b..59995f93964 100644 --- a/tests/stream/test_stream_smoke/0033_format_schema.yaml +++ b/tests/stream/test_stream_smoke/0033_format_schema.yaml @@ -144,13 +144,13 @@ tests: expected_results: - query_id: fmt-2-0 # unknown format type - expected_results: error_code:73 + expected_results: error_code:2613 - query_id: fmt-2-1 # empty body expected_results: error_code:36 - query_id: fmt-2-2 # invalid schema - expected_results: error_code:2504 + expected_results: error_code:434 - query_id: fmt-2-3 # nonexists expected_results: error_code:2611 From b696bb092e756c785e0a9e1cfc16ee4a34db993f Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Mon, 26 Feb 2024 12:27:54 -0800 Subject: [PATCH 13/16] fixed per review comments --- src/Formats/FormatSchemaFactory.cpp | 2 - src/Formats/KafkaSchemaRegistry.cpp | 32 ++--- src/Formats/KafkaSchemaRegistry.h | 5 +- src/Formats/ProtobufSchemas.cpp | 6 +- src/Formats/ProtobufSchemas.h | 9 +- src/Processors/Formats/ISchemaWriter.h | 2 +- .../Formats/Impl/AvroRowInputFormat.cpp | 136 ++---------------- .../Formats/Impl/AvroRowInputFormat.h | 1 - .../Formats/Impl/ProtobufRowInputFormat.cpp | 56 ++++---- .../Formats/Impl/ProtobufRowInputFormat.h | 3 +- 10 files changed, 69 insertions(+), 183 deletions(-) diff --git a/src/Formats/FormatSchemaFactory.cpp b/src/Formats/FormatSchemaFactory.cpp index 5aaf23a5853..615c205c87a 100644 --- a/src/Formats/FormatSchemaFactory.cpp +++ b/src/Formats/FormatSchemaFactory.cpp @@ -4,9 +4,7 @@ #include #include -/// proton: starts #include -/// proton: ends namespace DB { diff --git a/src/Formats/KafkaSchemaRegistry.cpp b/src/Formats/KafkaSchemaRegistry.cpp index 637dcb1159c..a90e7d774ec 100644 --- a/src/Formats/KafkaSchemaRegistry.cpp +++ b/src/Formats/KafkaSchemaRegistry.cpp @@ -1,6 +1,8 @@ #include #include #include +#include + #include #include #include @@ -13,13 +15,13 @@ namespace ErrorCodes extern const int INCORRECT_DATA; } -KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String & credentials_): base_url(base_url_) +KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String & credentials_) + : base_url(base_url_) + , logger(&Poco::Logger::get("KafkaSchemaRegistry")) { - if (credentials_.empty()) - return; + assert(!base_url.empty()); - auto pos = credentials_.find(':'); - if (pos == credentials_.npos) + if (auto pos = credentials_.find(':'); pos == credentials_.npos) credentials.setUsername(credentials_); else { @@ -30,14 +32,12 @@ KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String String KafkaSchemaRegistry::fetchSchema(UInt32 id) { - assert(!base_url.empty()); - try { try { - Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); - LOG_TRACE((&Poco::Logger::get("KafkaSchemaRegistry")), "Fetching schema id = {}", id); + Poco::URI url(base_url, std::format("/schemas/ids/{}", id)); + LOG_TRACE(logger, "Fetching schema id = {}", id); /// One second for connect/send/receive. Just in case. ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); @@ -67,7 +67,7 @@ String KafkaSchemaRegistry::fetchSchema(UInt32 id) Poco::JSON::Parser parser; auto json_body = parser.parse(*response_body).extract(); auto schema = json_body->getValue("schema"); - LOG_TRACE((&Poco::Logger::get("KafkaSchemaRegistry")), "Successfully fetched schema id = {}\n{}", id, schema); + LOG_TRACE(logger, "Successfully fetched schema id = {}\n{}", id, schema); return schema; } catch (const Exception &) @@ -81,7 +81,7 @@ String KafkaSchemaRegistry::fetchSchema(UInt32 id) } catch (Exception & e) { - e.addMessage("while fetching schema id = " + std::to_string(id)); + e.addMessage(std::format("while fetching schema with id {}", id)); throw; } } @@ -99,19 +99,15 @@ UInt32 KafkaSchemaRegistry::readSchemaId(ReadBuffer & in) catch (const Exception & e) { if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA) - { /// empty or incomplete message without magic byte or schema id - throw Exception("Missing magic byte or schema identifier.", ErrorCodes::INCORRECT_DATA); - } + throw Exception(ErrorCodes::INCORRECT_DATA, "Missing magic byte or schema identifier."); else throw; } if (magic != 0x00) - { - throw Exception("Invalid magic byte before schema identifier." - " Must be zero byte, found " + std::to_string(int(magic)) + " instead", ErrorCodes::INCORRECT_DATA); - } + throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid magic byte before schema identifier." + " Must be zero byte, found 0x{:x} instead", magic); return schema_id; } diff --git a/src/Formats/KafkaSchemaRegistry.h b/src/Formats/KafkaSchemaRegistry.h index 8f2b1d3154d..db6b87794ac 100644 --- a/src/Formats/KafkaSchemaRegistry.h +++ b/src/Formats/KafkaSchemaRegistry.h @@ -1,6 +1,7 @@ #pragma once #include + #include #include @@ -13,7 +14,7 @@ class KafkaSchemaRegistry final public: static UInt32 readSchemaId(ReadBuffer & in); - /// `credentials_` is expected to be formatted in ":". + /// \param credentials_ is expected to be formatted in ":". KafkaSchemaRegistry(const String & base_url_, const String & credentials_); String fetchSchema(UInt32 id); @@ -21,6 +22,8 @@ class KafkaSchemaRegistry final private: Poco::URI base_url; Poco::Net::HTTPBasicCredentials credentials; + + Poco::Logger* logger; }; } diff --git a/src/Formats/ProtobufSchemas.cpp b/src/Formats/ProtobufSchemas.cpp index 76fab9294a8..a8182ac9392 100644 --- a/src/Formats/ProtobufSchemas.cpp +++ b/src/Formats/ProtobufSchemas.cpp @@ -86,10 +86,8 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSch /// Overrides google::protobuf::io::ErrorCollector: void ProtobufSchemas::AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) { - throw Exception( - "Cannot parse schema, found an error at line " + std::to_string(line) + ", column " + std::to_string(column) - + ", " + message, - ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA); + throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, + "Cannot parse schema, found an error at line {}, column {}, error: {}", line, column, message); } void ProtobufSchemas::validateSchema(std::string_view schema) diff --git a/src/Formats/ProtobufSchemas.h b/src/Formats/ProtobufSchemas.h index 669d813cccc..aa12ce32a3b 100644 --- a/src/Formats/ProtobufSchemas.h +++ b/src/Formats/ProtobufSchemas.h @@ -3,15 +3,18 @@ #include "config.h" #if USE_PROTOBUF -#include - #include #include #include #include #include + +/// proton: starts +#include + #include #include +/// proton: ends namespace google @@ -29,7 +32,7 @@ class FormatSchemaInfo; /** Keeps parsed google protobuf schemas parsed from files. * This class is used to handle the "Protobuf" input/output formats. */ -class ProtobufSchemas : private boost::noncopyable, public google::protobuf::io::ErrorCollector /* proton: updated */ +class ProtobufSchemas : public google::protobuf::io::ErrorCollector /* proton: updated */ { public: static ProtobufSchemas & instance(); diff --git a/src/Processors/Formats/ISchemaWriter.h b/src/Processors/Formats/ISchemaWriter.h index 760b7aecba7..6fb6190fa36 100644 --- a/src/Processors/Formats/ISchemaWriter.h +++ b/src/Processors/Formats/ISchemaWriter.h @@ -16,7 +16,7 @@ class IExternalSchemaWriter virtual ~IExternalSchemaWriter() = default; - /// Validates the schema input. Should throws exceptions on validation failures. + /// Validates the schema input. Should throw exceptions on validation failures. virtual void validate() = 0; /// Persistents the schema. diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index cec23704d0b..23e57025a85 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -67,6 +67,7 @@ /// proton: starts #include +#include /// proton: ends namespace DB @@ -649,88 +650,8 @@ bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &ext } /// proton: starts -/// class AvroConfluentRowInputFormat::SchemaRegistry -/// { -/// public: -/// explicit SchemaRegistry(const std::string & base_url_, size_t schema_cache_max_size = 1000) -/// : base_url(base_url_), schema_cache(schema_cache_max_size) -/// { -/// if (base_url.empty()) -/// throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS); -/// } -/// -/// avro::ValidSchema getSchema(uint32_t id) -/// { -/// auto [schema, loaded] = schema_cache.getOrSet( -/// id, -/// [this, id](){ return std::make_shared(fetchSchema(id)); } -/// ); -/// return *schema; -/// } -/// -/// private: -/// avro::ValidSchema fetchSchema(uint32_t id) -/// { -/// try -/// { -/// try -/// { -/// Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); -/// LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Fetching schema id = {}", id); -/// -/// /// One second for connect/send/receive. Just in case. -/// ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); -/// -/// Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); -/// request.setHost(url.getHost()); -/// -/// auto session = makePooledHTTPSession(url, timeouts, 1); -/// std::istream * response_body{}; -/// try -/// { -/// session->sendRequest(request); -/// -/// Poco::Net::HTTPResponse response; -/// response_body = receiveResponse(*session, request, response, false); -/// } -/// catch (const Poco::Exception & e) -/// { -/// /// We use session data storage as storage for exception text -/// /// Depend on it we can deduce to reconnect session or reresolve session host -/// session->attachSessionData(e.message()); -/// throw; -/// } -/// Poco::JSON::Parser parser; -/// auto json_body = parser.parse(*response_body).extract(); -/// auto schema = json_body->getValue("schema"); -/// LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Successfully fetched schema id = {}\n{}", id, schema); -/// -/// return avro::compileJsonSchemaFromString(schema); -/// } -/// catch (const Exception &) -/// { -/// throw; -/// } -/// catch (const Poco::Exception & e) -/// { -/// throw Exception(Exception::CreateFromPocoTag{}, e); -/// } -/// catch (const avro::Exception & e) -/// { -/// throw Exception(e.what(), ErrorCodes::INCORRECT_DATA); -/// } -/// } -/// catch (Exception & e) -/// { -/// e.addMessage("while fetching schema id = " + std::to_string(id)); -/// throw; -/// } -/// } -/// -/// Poco::URI base_url; -/// LRUCache schema_cache; -/// }; - +/// We have refactored this class from the original implementation by extracting the schema registry related code +/// out to KafkaSchemaRegistry, to support other formats which also can be used with kafka schema registry. class AvroConfluentRowInputFormat::SchemaRegistry { public: @@ -760,7 +681,7 @@ class AvroConfluentRowInputFormat::SchemaRegistry catch (const avro::Exception & e) { auto ex = Exception(e.what(), ErrorCodes::INCORRECT_DATA); - ex.addMessage("while fetching schema id = " + std::to_string(id)); + ex.addMessage(std::format("while fetching schema id = ", id)); throw std::move(ex); } } @@ -782,7 +703,7 @@ static std::shared_ptr getConfluentSchemaRegistry(const const auto & credentials = format_settings.schema.kafka_schema_registry_credentials; auto [schema_registry, loaded] = schema_registry_cache.getOrSet( base_url + credentials, - [base_url, credentials]() + [&base_url, &credentials]() { return std::make_shared(base_url, credentials); } @@ -791,48 +712,14 @@ static std::shared_ptr getConfluentSchemaRegistry(const return schema_registry; } -/// proton: starts -/// static uint32_t readConfluentSchemaId(ReadBuffer & in) -/// { -/// uint8_t magic; -/// uint32_t schema_id; -/// -/// try -/// { -/// readBinaryBigEndian(magic, in); -/// readBinaryBigEndian(schema_id, in); -/// } -/// catch (const Exception & e) -/// { -/// if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA) -/// { -/// /* empty or incomplete message without Avro Confluent magic number or schema id */ -/// throw Exception("Missing AvroConfluent magic byte or schema identifier.", ErrorCodes::INCORRECT_DATA); -/// } -/// else -/// throw; -/// } -/// -/// if (magic != 0x00) -/// { -/// throw Exception("Invalid magic byte before AvroConfluent schema identifier." -/// " Must be zero byte, found " + std::to_string(int(magic)) + " instead", ErrorCodes::INCORRECT_DATA); -/// } -/// -/// return schema_id; -/// } -/// proton: ends - AvroConfluentRowInputFormat::AvroConfluentRowInputFormat( const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) : IRowInputFormat(header_, in_, params_, ProcessorID::AvroConfluentRowInputFormatID) , schema_registry(getConfluentSchemaRegistry(format_settings_)) - /// , input_stream(std::make_unique(*in)) /* proton: updated */ , decoder(avro::binaryDecoder()) , format_settings(format_settings_) { - /// decoder->init(*input_stream); /* proton: updated */ } bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) @@ -972,9 +859,11 @@ void registerInputFormatAvro(FormatFactory & factory) const FormatSettings & settings) -> InputFormatPtr { /// proton: starts - /// Use only one format name "Avro" to support both shema registry and non-schema registry use cases, rather than using another name "AvroConfluent" + /// Use only one format name "Avro" to support both schema registry and non-schema registry use cases, rather than using another name "AvroConfluent", + /// which is what ClickHouse did. if (settings.avro.schema_registry_url.empty() && settings.schema.kafka_schema_registry_url.empty()) + /// Non-schema registry case return std::make_shared(sample, buf, params, settings); if (!settings.schema.format_schema.empty()) @@ -982,15 +871,6 @@ void registerInputFormatAvro(FormatFactory & factory) return std::make_shared(sample, buf, params, settings); /// proton: ends }); - - factory.registerInputFormat("AvroConfluent",[]( - ReadBuffer & buf, - const Block & sample, - const RowInputFormatParams & params, - const FormatSettings & settings) - { - return std::make_shared(sample, buf, params, settings); - }); } void registerAvroSchemaReader(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.h b/src/Processors/Formats/Impl/AvroRowInputFormat.h index 5fda734efc1..3e333682599 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -155,7 +155,6 @@ class AvroConfluentRowInputFormat final : public IRowInputFormat std::unordered_map deserializer_cache; const AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id); - /// avro::InputStreamPtr input_stream; /* proton: updated */ avro::DecoderPtr decoder; FormatSettings format_settings; }; diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index f767ad4f740..82747b9624e 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -15,6 +15,8 @@ # include # include # include +# include + # include # include # include @@ -142,15 +144,19 @@ NamesAndTypesList ProtobufSchemaReader::readSchema() namespace { using ConfluentSchemaRegistry = ProtobufConfluentRowInputFormat::SchemaRegistryWithCache; -#define SCHEMA_REGISTRY_CACHE_MAX_SIZE 1000 -/// Cache of Schema Registry URL + credentials -> SchemaRegistry -LRUCache schema_registry_cache(SCHEMA_REGISTRY_CACHE_MAX_SIZE); + +auto & schemaRegistryCache() +{ + /// Cache of Schema Registry URL + credentials -> SchemaRegistry + static LRUCache schema_registry_cache(/*max_size_=*/1000); + return schema_registry_cache; +} std::shared_ptr getConfluentSchemaRegistry(const String & base_url, const String & credentials) { - auto [schema_registry, loaded] = schema_registry_cache.getOrSet( + auto [schema_registry, loaded] = schemaRegistryCache().getOrSet( base_url + credentials, - [base_url, credentials]() + [&base_url, &credentials]() { return std::make_shared(base_url, credentials); } @@ -177,16 +183,19 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache: public google::p { assert(!indexes.empty()); - const auto *fd = getSchema(schema_id); + const auto * fd = getSchema(schema_id); + + /// Check https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + /// for how to get the message with the indexes. auto mt_count = fd->message_type_count(); if (mt_count < indexes[0] + 1) throw Exception(ErrorCodes::INVALID_DATA, "Invalid message index={} max_index={}", indexes[0], mt_count); - const auto *descriptor = fd->message_type(indexes[0]); + const auto * descriptor = fd->message_type(indexes[0]); - for (auto i : std::vector(indexes.begin() + 1, indexes.end())) + for (auto i : std::span(indexes.begin() + 1, indexes.end())) { - if (i > static_cast(descriptor->nested_type_count())) + if (i > descriptor->nested_type_count()) throw Exception(ErrorCodes::INVALID_DATA, "Invalid message index={} max_index={} descriptor={}", i, descriptor->nested_type_count(), descriptor->name()); descriptor = descriptor->nested_type(i); } @@ -197,36 +206,33 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache: public google::p private: const google::protobuf::FileDescriptor * getSchema(uint32_t id) { - const auto * loaded_descriptor = registry_pool()->FindFileByName(std::to_string(id)); + const auto * loaded_descriptor = descriptor_pool.FindFileByName(std::to_string(id)); if (loaded_descriptor) return loaded_descriptor; return fetchSchema(id); } - const google::protobuf::FileDescriptor* fetchSchema(uint32_t id) + const google::protobuf::FileDescriptor * fetchSchema(uint32_t id) { auto schema = registry.fetchSchema(id); - std::string schema_content {schema.data(), schema.size()}; google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; google::protobuf::io::Tokenizer tokenizer(&input, this); - google::protobuf::FileDescriptorProto descriptor; - descriptor.set_name(std::to_string(id)); + google::protobuf::FileDescriptorProto file_descriptor; + file_descriptor.set_name(std::to_string(id)); google::protobuf::compiler::Parser parser; parser.RecordErrorsTo(this); - parser.Parse(&tokenizer, &descriptor); + parser.Parse(&tokenizer, &file_descriptor); - auto const * ret = registry_pool()->BuildFile(descriptor); - auto mt_count = ret->message_type_count(); - if (mt_count < 1) - throw Exception(ErrorCodes::INVALID_DATA, "No message type in schema"); - return ret; + auto const * descriptor = descriptor_pool.BuildFile(file_descriptor); + if (descriptor && descriptor->message_type_count() > 0) + return descriptor; + + throw Exception(ErrorCodes::INVALID_DATA, "No message type in schema"); } KafkaSchemaRegistry registry; - google::protobuf::DescriptorPool registry_pool_; - - google::protobuf::DescriptorPool* registry_pool() { return ®istry_pool_; } + google::protobuf::DescriptorPool descriptor_pool; }; ProtobufConfluentRowInputFormat::ProtobufConfluentRowInputFormat( @@ -251,10 +257,12 @@ bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadE indexes.reserve(indexes_count < 1 ? 1 : indexes_count); if (indexes_count < 1) + { indexes.push_back(0); + } else { - for (size_t i = 0; i < static_cast(indexes_count); i++) + for (Int64 i = 0; i < indexes_count; i++) { Int64 ind = 0; readVarInt(ind, *in); diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h index 23a4f32a57a..9dc81b4afee 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h @@ -7,6 +7,7 @@ # include # include # include + /// proton: starts # include /// proton: ends @@ -71,7 +72,7 @@ class ProtobufSchemaReader : public IExternalSchemaReader /// proton: starts /// Confluent framing + Protobuf binary datum encoding. Mainly used for Kafka. /// Uses 3 caches: -/// 1. global: schema registry cache (base_url -> SchemaRegistry) +/// 1. global: schema registry cache (base_url + credentials -> SchemaRegistry) /// 2. SchemaRegistry: schema cache (schema_id -> schema) /// 3. ProtobufConfluentRowInputFormat: deserializer cache (schema_id -> AvroDeserializer) /// This is needed because KafkaStorage creates a new instance of InputFormat per a batch of messages From 91eaae293f931bb48d2a975d849c1eea093e0db9 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Mon, 26 Feb 2024 12:58:30 -0800 Subject: [PATCH 14/16] fixed a race condition --- src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 82747b9624e..98f37053d04 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -215,6 +215,12 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache: public google::p const google::protobuf::FileDescriptor * fetchSchema(uint32_t id) { + std::lock_guard lock(mutex); + /// Just in case we got beaten + const auto * loaded_descriptor = descriptor_pool.FindFileByName(std::to_string(id)); + if (loaded_descriptor) + return loaded_descriptor; + auto schema = registry.fetchSchema(id); google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; google::protobuf::io::Tokenizer tokenizer(&input, this); @@ -231,6 +237,7 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache: public google::p throw Exception(ErrorCodes::INVALID_DATA, "No message type in schema"); } + std::mutex mutex; KafkaSchemaRegistry registry; google::protobuf::DescriptorPool descriptor_pool; }; From 4cb8aa2dd8723ad8038e3268785a48791e3d535e Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Mon, 26 Feb 2024 13:10:52 -0800 Subject: [PATCH 15/16] review comment --- .../Formats/Impl/ProtobufRowInputFormat.cpp | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 98f37053d04..97499bce414 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -294,10 +294,13 @@ bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadE serializer->readRow(row_num); - row_read_extension.read_columns.clear(); - row_read_extension.read_columns.resize(columns.size(), true); - for (size_t column_idx : missing_column_indices) - row_read_extension.read_columns[column_idx] = false; + assert(row_read_extension.read_columns.empty()); + if (!missing_column_indices.empty()) + { + row_read_extension.read_columns.resize(columns.size(), true); + for (size_t column_idx : missing_column_indices) + row_read_extension.read_columns[column_idx] = false; + } return true; } From aa4c34f56e69291a7c3b7458d637efcf3c8d15a2 Mon Sep 17 00:00:00 2001 From: Gimi Liang Date: Mon, 26 Feb 2024 13:19:14 -0800 Subject: [PATCH 16/16] fixed format --- .../Formats/Impl/ProtobufRowInputFormat.cpp | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 97499bce414..77601fcc87f 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -7,8 +7,8 @@ # include # include # include -# include # include +# include # include /// proton: starts @@ -85,7 +85,8 @@ void ProtobufRowInputFormat::syncAfterError() reader->endMessage(true); } -void registerInputFormatProtobuf(FormatFactory & factory){ +void registerInputFormatProtobuf(FormatFactory & factory) +{ /// proton: starts /// for (bool with_length_delimiter : {false, true}) /// { @@ -100,28 +101,27 @@ void registerInputFormatProtobuf(FormatFactory & factory){ factory.registerInputFormat( "ProtobufSingle", - [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) -> std::shared_ptr + [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) + -> std::shared_ptr { if (settings.schema.kafka_schema_registry_url.empty()) return std::make_shared( buf, sample, std::move(params), FormatSchemaInfo(settings, "Protobuf", true), /*with_length_delimiter=*/false); if (!settings.schema.format_schema.empty()) - throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "kafka_schema_registry_url and format_schema cannot be used at the same time"); + throw Exception( + ErrorCodes::INVALID_SETTING_VALUE, "kafka_schema_registry_url and format_schema cannot be used at the same time"); - return std::make_shared( - buf, sample, std::move(params), settings); + return std::make_shared(buf, sample, std::move(params), settings); }); factory.registerInputFormat( - "Protobuf", - [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) + "Protobuf", [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) { return std::make_shared( buf, sample, std::move(params), FormatSchemaInfo(settings, "Protobuf", true), /*with_length_delimiter=*/true); }); /// proton: ends - } ProtobufSchemaReader::ProtobufSchemaReader(const FormatSettings & format_settings) @@ -148,7 +148,7 @@ using ConfluentSchemaRegistry = ProtobufConfluentRowInputFormat::SchemaRegistryW auto & schemaRegistryCache() { /// Cache of Schema Registry URL + credentials -> SchemaRegistry - static LRUCache schema_registry_cache(/*max_size_=*/1000); + static LRUCache schema_registry_cache(/*max_size_=*/1000); return schema_registry_cache; } @@ -156,22 +156,15 @@ std::shared_ptr getConfluentSchemaRegistry(const String { auto [schema_registry, loaded] = schemaRegistryCache().getOrSet( base_url + credentials, - [&base_url, &credentials]() - { - return std::make_shared(base_url, credentials); - } - ); + [&base_url, &credentials]() { return std::make_shared(base_url, credentials); }); return schema_registry; } } -class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache: public google::protobuf::io::ErrorCollector +class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache : public google::protobuf::io::ErrorCollector { public: - SchemaRegistryWithCache(const String & base_url, const String & credentials) - : registry(base_url, credentials) - { - } + SchemaRegistryWithCache(const String & base_url, const String & credentials) : registry(base_url, credentials) { } /// Overrides google::protobuf::io::ErrorCollector. void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override @@ -196,7 +189,12 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache: public google::p for (auto i : std::span(indexes.begin() + 1, indexes.end())) { if (i > descriptor->nested_type_count()) - throw Exception(ErrorCodes::INVALID_DATA, "Invalid message index={} max_index={} descriptor={}", i, descriptor->nested_type_count(), descriptor->name()); + throw Exception( + ErrorCodes::INVALID_DATA, + "Invalid message index={} max_index={} descriptor={}", + i, + descriptor->nested_type_count(), + descriptor->name()); descriptor = descriptor->nested_type(i); } @@ -243,9 +241,10 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache: public google::p }; ProtobufConfluentRowInputFormat::ProtobufConfluentRowInputFormat( - ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) + ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) : IRowInputFormat(header_, in_, params_, ProcessorID::ProtobufRowInputFormatID) - , registry(getConfluentSchemaRegistry(format_settings_.schema.kafka_schema_registry_url, format_settings_.schema.kafka_schema_registry_credentials)) + , registry(getConfluentSchemaRegistry( + format_settings_.schema.kafka_schema_registry_url, format_settings_.schema.kafka_schema_registry_credentials)) { } @@ -279,7 +278,7 @@ bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadE const auto & header = getPort().getHeader(); - ProtobufReader reader {*in}; + ProtobufReader reader{*in}; serializer = ProtobufSerializer::create( header.getNames(), header.getDataTypes(),