diff --git a/build.sh b/build.sh index 4b50d027cce..1680edf7eb4 100755 --- a/build.sh +++ b/build.sh @@ -15,7 +15,7 @@ if [ -z "$sanitizer" ]; then fi cmake .. \ - -DCMAKE_BUILD_TYPE=${build_type} \ + -DCMAKE_BUILD_TYPE="$build_type" \ -DENABLE_PROTON_ALL=OFF \ -DENABLE_PROTON_SERVER=ON \ -DENABLE_PROTON_CLIENT=ON \ @@ -73,5 +73,6 @@ cmake .. \ -DENABLE_SNOWFLAKE_FUNCS=ON \ -DENABLE_ENCRYPT_DECRYPT_FUNCS=ON \ -DENABLE_DEBUG_FUNCS=ON \ - -DENABLE_URL_FUNCS=ON + -DENABLE_URL_FUNCS=ON \ + -DENABLE_AVRO=ON diff --git a/docker/packager/packager b/docker/packager/packager index 67bc8ed08e8..75612f65c38 100755 --- a/docker/packager/packager +++ b/docker/packager/packager @@ -219,6 +219,7 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, image_typ cmake_flags.append('-DENABLE_KRB5=ON') cmake_flags.append('-DENABLE_BROTLI=ON') cmake_flags.append('-DENABLE_S3=ON') + cmake_flags.append('-DENABLE_AVRO=ON') # krb5: Disabled in environments other than Linux and native Darwin. # Reference: contrib/krb5-cmake/CMakeLists.txt:3 diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 925ffa9e552..ffca52e88bc 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -769,6 +769,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map proton Enum and CapnProto Enum", 0)\ M(String, rawstore_time_extraction_type, "", "_tp_time extraction type (string, json, regex)", 0) \ M(String, rawstore_time_extraction_rule, "", "_tp_time extraction rule (string, json, regex)", 0) \ + M(URI, kafka_schema_registry_url, "", "For ProtobufSingle format: Kafka Schema Registry URL.", 0) \ + M(String, kafka_schema_registry_credentials, "", "Credetials to be used to fetch schema from the `kafka_schema_registry_url`, with format ':'.", 0) \ /** proton: ends. */ // End of FORMAT_FACTORY_SETTINGS // Please add settings non-related to formats into the COMMON_SETTINGS above. diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 0e0b9c71ff8..ec5f24fb0c0 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -109,6 +109,10 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.schema.format_schema = settings.format_schema; format_settings.schema.format_schema_path = context->getFormatSchemaPath(); format_settings.schema.is_server = context->hasGlobalContext() && (context->getGlobalContext()->getApplicationType() == Context::ApplicationType::SERVER); + /// proton: starts + format_settings.schema.kafka_schema_registry_url = settings.kafka_schema_registry_url.toString(); + format_settings.schema.kafka_schema_registry_credentials = settings.kafka_schema_registry_credentials; + /// proton: ends format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields; format_settings.template_settings.resultset_format = settings.format_template_resultset; format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter; diff --git a/src/Formats/FormatSchemaFactory.cpp b/src/Formats/FormatSchemaFactory.cpp index dca1ee85a7b..615c205c87a 100644 --- a/src/Formats/FormatSchemaFactory.cpp +++ b/src/Formats/FormatSchemaFactory.cpp @@ -4,6 +4,8 @@ #include #include +#include + namespace DB { @@ -18,24 +20,12 @@ extern const int INVALID_DATA; namespace { + String basename(const std::filesystem::path & path) { return path.filename().replace_extension().string(); } -String formatSchemaValidationErrors(SchemaValidationErrors errors) -{ - assert(!errors.empty()); - String ret; - for (size_t i = 0; i < errors.size(); ++i) - { - if (i > 0) - ret.append("; "); - ret.append(fmt::format("line: {}, columns: {}, error: {}", errors[i].line, errors[i].col, errors[i].error)); - } - - return ret; -} } void FormatSchemaFactory::registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, ContextPtr & context) @@ -53,8 +43,17 @@ void FormatSchemaFactory::registerSchema(const String & schema_name, const Strin std::lock_guard lock(mutex); auto writer = FormatFactory::instance().getExternalSchemaWriter(format, schema_body, context, format_settings); assert(writer); /* confirmed with checkSchemaType */ - if (auto errors = writer->validate(); !errors.empty()) - throw Exception(ErrorCodes::INVALID_DATA, "Invalid Protobuf schema, errors: {}", formatSchemaValidationErrors(errors)); + + try + { + writer->validate(); + } + catch (DB::Exception & e) + { + e.addMessage(std::format("{} schema {} was invalid", format, schema_name)); + e.rethrow(); + } + auto result = writer->write(exists_op == ExistsOP::Replace); if (!result && exists_op == ExistsOP::Throw) diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 08ff2975483..0eb7546ed5a 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -207,6 +207,10 @@ struct FormatSettings std::string format_schema; std::string format_schema_path; bool is_server = false; + /// proton: starts + std::string kafka_schema_registry_url; + std::string kafka_schema_registry_credentials; + /// proton: ends } schema; struct diff --git a/src/Formats/KafkaSchemaRegistry.cpp b/src/Formats/KafkaSchemaRegistry.cpp new file mode 100644 index 00000000000..a90e7d774ec --- /dev/null +++ b/src/Formats/KafkaSchemaRegistry.cpp @@ -0,0 +1,115 @@ +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int INCORRECT_DATA; +} + +KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String & credentials_) + : base_url(base_url_) + , logger(&Poco::Logger::get("KafkaSchemaRegistry")) +{ + assert(!base_url.empty()); + + if (auto pos = credentials_.find(':'); pos == credentials_.npos) + credentials.setUsername(credentials_); + else + { + credentials.setUsername(credentials_.substr(0, pos)); + credentials.setPassword(credentials_.substr(pos + 1)); + } +} + +String KafkaSchemaRegistry::fetchSchema(UInt32 id) +{ + try + { + try + { + Poco::URI url(base_url, std::format("/schemas/ids/{}", id)); + LOG_TRACE(logger, "Fetching schema id = {}", id); + + /// One second for connect/send/receive. Just in case. + ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); + + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); + request.setHost(url.getHost()); + + if (!credentials.empty()) + credentials.authenticate(request); + + auto session = makePooledHTTPSession(url, timeouts, 1); + std::istream * response_body{}; + try + { + session->sendRequest(request); + + Poco::Net::HTTPResponse response; + response_body = receiveResponse(*session, request, response, false); + } + catch (const Poco::Exception & e) + { + /// We use session data storage as storage for exception text + /// Depend on it we can deduce to reconnect session or reresolve session host + session->attachSessionData(e.message()); + throw; + } + Poco::JSON::Parser parser; + auto json_body = parser.parse(*response_body).extract(); + auto schema = json_body->getValue("schema"); + LOG_TRACE(logger, "Successfully fetched schema id = {}\n{}", id, schema); + return schema; + } + catch (const Exception &) + { + throw; + } + catch (const Poco::Exception & e) + { + throw Exception(Exception::CreateFromPocoTag{}, e); + } + } + catch (Exception & e) + { + e.addMessage(std::format("while fetching schema with id {}", id)); + throw; + } +} + +UInt32 KafkaSchemaRegistry::readSchemaId(ReadBuffer & in) +{ + uint8_t magic; + uint32_t schema_id; + + try + { + readBinaryBigEndian(magic, in); + readBinaryBigEndian(schema_id, in); + } + catch (const Exception & e) + { + if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA) + /// empty or incomplete message without magic byte or schema id + throw Exception(ErrorCodes::INCORRECT_DATA, "Missing magic byte or schema identifier."); + else + throw; + } + + if (magic != 0x00) + throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid magic byte before schema identifier." + " Must be zero byte, found 0x{:x} instead", magic); + + return schema_id; +} + +} diff --git a/src/Formats/KafkaSchemaRegistry.h b/src/Formats/KafkaSchemaRegistry.h new file mode 100644 index 00000000000..db6b87794ac --- /dev/null +++ b/src/Formats/KafkaSchemaRegistry.h @@ -0,0 +1,29 @@ +#pragma once + +#include + +#include +#include + +namespace DB +{ + +/// A helper class helps working with Kafka schema registry. +class KafkaSchemaRegistry final +{ +public: + static UInt32 readSchemaId(ReadBuffer & in); + + /// \param credentials_ is expected to be formatted in ":". + KafkaSchemaRegistry(const String & base_url_, const String & credentials_); + + String fetchSchema(UInt32 id); + +private: + Poco::URI base_url; + Poco::Net::HTTPBasicCredentials credentials; + + Poco::Logger* logger; +}; + +} diff --git a/src/Formats/ProtobufSchemas.cpp b/src/Formats/ProtobufSchemas.cpp index dbf70489814..a8182ac9392 100644 --- a/src/Formats/ProtobufSchemas.cpp +++ b/src/Formats/ProtobufSchemas.cpp @@ -1,11 +1,13 @@ #include "config.h" #if USE_PROTOBUF +# include +# include # include -# include +# include # include +# include # include -# include namespace DB @@ -14,32 +16,7 @@ namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int CANNOT_PARSE_PROTOBUF_SCHEMA; - /// proton: starts - extern const int INVALID_DATA; - /// proton: ends -} - -/// proton: starts -namespace -{ -class ErrorCollector final: public google::protobuf::io::ErrorCollector -{ -private: - SchemaValidationErrors errors; - -public: - void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override - { - errors.emplace_back(line, column, message); - } - - const SchemaValidationErrors & getErrors() const - { - return errors; - } -}; } -/// proton: starts ProtobufSchemas & ProtobufSchemas::instance() { @@ -106,17 +83,22 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSch } /// proton: starts -SchemaValidationErrors ProtobufSchemas::validateSchema(std::string_view schema) +/// Overrides google::protobuf::io::ErrorCollector: +void ProtobufSchemas::AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) +{ + throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, + "Cannot parse schema, found an error at line {}, column {}, error: {}", line, column, message); +} + +void ProtobufSchemas::validateSchema(std::string_view schema) { google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; - ErrorCollector error_collector; - google::protobuf::io::Tokenizer tokenizer(&input, &error_collector); + google::protobuf::io::Tokenizer tokenizer(&input, this); google::protobuf::FileDescriptorProto descriptor; google::protobuf::compiler::Parser parser; - parser.RecordErrorsTo(&error_collector); + parser.RecordErrorsTo(this); parser.Parse(&tokenizer, &descriptor); - return error_collector.getErrors(); } /// proton: ends diff --git a/src/Formats/ProtobufSchemas.h b/src/Formats/ProtobufSchemas.h index 187b6b6c84e..aa12ce32a3b 100644 --- a/src/Formats/ProtobufSchemas.h +++ b/src/Formats/ProtobufSchemas.h @@ -9,6 +9,13 @@ #include #include +/// proton: starts +#include + +#include +#include +/// proton: ends + namespace google { @@ -21,28 +28,28 @@ namespace protobuf namespace DB { class FormatSchemaInfo; -/// proton: starts -struct SchemaValidationError; -using SchemaValidationErrors = std::vector; -/// proton: ends /** Keeps parsed google protobuf schemas parsed from files. * This class is used to handle the "Protobuf" input/output formats. */ -class ProtobufSchemas : private boost::noncopyable +class ProtobufSchemas : public google::protobuf::io::ErrorCollector /* proton: updated */ { public: static ProtobufSchemas & instance(); ProtobufSchemas(); - ~ProtobufSchemas(); + ~ProtobufSchemas() override; /* proton: updated */ /// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type. /// The function never returns nullptr, it throws an exception if it cannot load or parse the file. const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info); /// proton: starts - SchemaValidationErrors validateSchema(std::string_view schema); + void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override; + + /// Validates the given schema and throw a DB::Exception if the schema is invalid. + /// The exception will contain the first error encountered when validating the schema, i.e. there could be more errors. + void validateSchema(std::string_view schema); /// proton: ends private: class ImporterWithSourceTree; diff --git a/src/KafkaLog/KafkaWALSettings.h b/src/KafkaLog/KafkaWALSettings.h index 484e59a8ecd..ef81ed35076 100644 --- a/src/KafkaLog/KafkaWALSettings.h +++ b/src/KafkaLog/KafkaWALSettings.h @@ -146,6 +146,7 @@ struct KafkaWALSettings settings.push_back(fmt::format("shared_subscription_flush_threshold_bytes={}", shared_subscription_flush_threshold_bytes)); settings.push_back(fmt::format("shared_subscription_flush_threshold_ms={}", shared_subscription_flush_threshold_ms)); settings.push_back(fmt::format("auth.security.protocol={}", auth.security_protocol)); + settings.push_back(fmt::format("auth.sasl.mechanism={}", auth.sasl_mechanism)); settings.push_back(fmt::format("auth.username={}", auth.username)); settings.push_back(fmt::format("auth.password={}", auth.password)); settings.push_back(fmt::format("auth.ssl.ca.location={}", auth.ssl_ca_cert_file)); diff --git a/src/Processors/Formats/ISchemaWriter.h b/src/Processors/Formats/ISchemaWriter.h index 93f02a744b6..6fb6190fa36 100644 --- a/src/Processors/Formats/ISchemaWriter.h +++ b/src/Processors/Formats/ISchemaWriter.h @@ -6,20 +6,6 @@ namespace DB { -struct SchemaValidationError final -{ - /// On which line the error happens. If line < 0, it indicates the error is not related to specific line, - /// or the validator is unable to provide such information. - int line; - /// At which column the error happens. If col < 0, it indicates the error is not related to specific column, - /// or the validator is unable to provide such information. - int col; - /// Error message - String error; -}; - -using SchemaValidationErrors = std::vector; - class IExternalSchemaWriter { public: @@ -30,8 +16,8 @@ class IExternalSchemaWriter virtual ~IExternalSchemaWriter() = default; - /// Validates the schema input. - virtual SchemaValidationErrors validate() = 0; + /// Validates the schema input. Should throw exceptions on validation failures. + virtual void validate() = 0; /// Persistents the schema. /// If the schema already exists, and replace_if_exist is false, it returns false. diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 9759aee830b..23e57025a85 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -65,6 +65,10 @@ #include #include +/// proton: starts +#include +#include +/// proton: ends namespace DB { @@ -78,6 +82,10 @@ namespace ErrorCodes extern const int TYPE_MISMATCH; extern const int CANNOT_PARSE_UUID; extern const int CANNOT_READ_ALL_DATA; + + /// proton: starts + extern const int INVALID_SETTING_VALUE; + /// proton: ends } class InputStreamReadBufferAdapter : public avro::InputStream @@ -641,14 +649,16 @@ bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &ext return false; } +/// proton: starts +/// We have refactored this class from the original implementation by extracting the schema registry related code +/// out to KafkaSchemaRegistry, to support other formats which also can be used with kafka schema registry. class AvroConfluentRowInputFormat::SchemaRegistry { public: - explicit SchemaRegistry(const std::string & base_url_, size_t schema_cache_max_size = 1000) - : base_url(base_url_), schema_cache(schema_cache_max_size) + explicit SchemaRegistry(const std::string & base_url, const std::string & credentials, size_t schema_cache_max_size = 1000) + : registry(base_url, credentials) + , schema_cache(schema_cache_max_size) { - if (base_url.empty()) - throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS); } avro::ValidSchema getSchema(uint32_t id) @@ -663,64 +673,23 @@ class AvroConfluentRowInputFormat::SchemaRegistry private: avro::ValidSchema fetchSchema(uint32_t id) { + auto schema = registry.fetchSchema(id); try { - try - { - Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); - LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Fetching schema id = {}", id); - - /// One second for connect/send/receive. Just in case. - ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); - - Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); - request.setHost(url.getHost()); - - auto session = makePooledHTTPSession(url, timeouts, 1); - std::istream * response_body{}; - try - { - session->sendRequest(request); - - Poco::Net::HTTPResponse response; - response_body = receiveResponse(*session, request, response, false); - } - catch (const Poco::Exception & e) - { - /// We use session data storage as storage for exception text - /// Depend on it we can deduce to reconnect session or reresolve session host - session->attachSessionData(e.message()); - throw; - } - Poco::JSON::Parser parser; - auto json_body = parser.parse(*response_body).extract(); - auto schema = json_body->getValue("schema"); - LOG_TRACE((&Poco::Logger::get("AvroConfluentRowInputFormat")), "Successfully fetched schema id = {}\n{}", id, schema); - return avro::compileJsonSchemaFromString(schema); - } - catch (const Exception &) - { - throw; - } - catch (const Poco::Exception & e) - { - throw Exception(Exception::CreateFromPocoTag{}, e); - } - catch (const avro::Exception & e) - { - throw Exception(e.what(), ErrorCodes::INCORRECT_DATA); - } + return avro::compileJsonSchemaFromString(schema); } - catch (Exception & e) + catch (const avro::Exception & e) { - e.addMessage("while fetching schema id = " + std::to_string(id)); - throw; + auto ex = Exception(e.what(), ErrorCodes::INCORRECT_DATA); + ex.addMessage(std::format("while fetching schema id = ", id)); + throw std::move(ex); } } - Poco::URI base_url; + KafkaSchemaRegistry registry; LRUCache schema_cache; }; +/// proton: ends using ConfluentSchemaRegistry = AvroConfluentRowInputFormat::SchemaRegistry; #define SCHEMA_REGISTRY_CACHE_MAX_SIZE 1000 @@ -729,57 +698,28 @@ static LRUCache schema_registry_cache(SCH static std::shared_ptr getConfluentSchemaRegistry(const FormatSettings & format_settings) { - const auto & base_url = format_settings.avro.schema_registry_url; + /// proton: starts + const auto & base_url = format_settings.schema.kafka_schema_registry_url.empty() ? format_settings.avro.schema_registry_url : format_settings.schema.kafka_schema_registry_url; + const auto & credentials = format_settings.schema.kafka_schema_registry_credentials; auto [schema_registry, loaded] = schema_registry_cache.getOrSet( - base_url, - [base_url]() + base_url + credentials, + [&base_url, &credentials]() { - return std::make_shared(base_url); + return std::make_shared(base_url, credentials); } ); + /// proton: ends return schema_registry; } -static uint32_t readConfluentSchemaId(ReadBuffer & in) -{ - uint8_t magic; - uint32_t schema_id; - - try - { - readBinaryBigEndian(magic, in); - readBinaryBigEndian(schema_id, in); - } - catch (const Exception & e) - { - if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA) - { - /* empty or incomplete message without Avro Confluent magic number or schema id */ - throw Exception("Missing AvroConfluent magic byte or schema identifier.", ErrorCodes::INCORRECT_DATA); - } - else - throw; - } - - if (magic != 0x00) - { - throw Exception("Invalid magic byte before AvroConfluent schema identifier." - " Must be zero byte, found " + std::to_string(int(magic)) + " instead", ErrorCodes::INCORRECT_DATA); - } - - return schema_id; -} - AvroConfluentRowInputFormat::AvroConfluentRowInputFormat( const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) : IRowInputFormat(header_, in_, params_, ProcessorID::AvroConfluentRowInputFormatID) , schema_registry(getConfluentSchemaRegistry(format_settings_)) - , input_stream(std::make_unique(*in)) , decoder(avro::binaryDecoder()) , format_settings(format_settings_) { - decoder->init(*input_stream); } bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext) @@ -793,8 +733,14 @@ bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExten { return false; } - SchemaId schema_id = readConfluentSchemaId(*in); + + /// proton: starts + SchemaId schema_id = KafkaSchemaRegistry::readSchemaId(*in); const auto & deserializer = getOrCreateDeserializer(schema_id); + InputStreamReadBufferAdapter is {*in}; + decoder->init(is); + /// proton: ends + deserializer.deserializeRow(columns, *decoder, ext); decoder->drain(); return true; @@ -828,7 +774,7 @@ NamesAndTypesList AvroSchemaReader::readSchema() avro::NodePtr root_node; if (confluent) { - UInt32 schema_id = readConfluentSchemaId(in); + UInt32 schema_id = KafkaSchemaRegistry::readSchemaId(in); root_node = getConfluentSchemaRegistry(format_settings)->getSchema(schema_id).root(); } else @@ -910,18 +856,20 @@ void registerInputFormatAvro(FormatFactory & factory) ReadBuffer & buf, const Block & sample, const RowInputFormatParams & params, - const FormatSettings & settings) + const FormatSettings & settings) -> InputFormatPtr { - return std::make_shared(sample, buf, params, settings); - }); + /// proton: starts + /// Use only one format name "Avro" to support both schema registry and non-schema registry use cases, rather than using another name "AvroConfluent", + /// which is what ClickHouse did. - factory.registerInputFormat("AvroConfluent",[]( - ReadBuffer & buf, - const Block & sample, - const RowInputFormatParams & params, - const FormatSettings & settings) - { + if (settings.avro.schema_registry_url.empty() && settings.schema.kafka_schema_registry_url.empty()) + /// Non-schema registry case + return std::make_shared(sample, buf, params, settings); + + if (!settings.schema.format_schema.empty()) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "schema_registry_url and format_schema cannot be used at the same time"); return std::make_shared(sample, buf, params, settings); + /// proton: ends }); } diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.h b/src/Processors/Formats/Impl/AvroRowInputFormat.h index 2ec3fdace8f..3e333682599 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -145,7 +145,7 @@ class AvroConfluentRowInputFormat final : public IRowInputFormat class SchemaRegistry; private: - virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; /* proton: updated */ bool allowSyncAfterError() const override { return true; } void syncAfterError() override; @@ -155,7 +155,6 @@ class AvroConfluentRowInputFormat final : public IRowInputFormat std::unordered_map deserializer_cache; const AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id); - avro::InputStreamPtr input_stream; avro::DecoderPtr decoder; FormatSettings format_settings; }; diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 454bf5236f1..77601fcc87f 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -7,15 +7,34 @@ # include # include # include -# include # include +# include # include +/// proton: starts +# include +# include +# include +# include + +# include +# include +# include +/// proton: ends namespace DB { -ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_) +/// proton: starts +namespace ErrorCodes +{ +extern const int INVALID_DATA; +extern const int INVALID_SETTING_VALUE; +} +/// proton: ends + +ProtobufRowInputFormat::ProtobufRowInputFormat( + ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_) : IRowInputFormat(header_, in_, params_, ProcessorID::ProtobufRowInputFormatID) , reader(std::make_unique(in_)) , serializer(ProtobufSerializer::create( @@ -24,7 +43,7 @@ ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & h missing_column_indices, *ProtobufSchemas::instance().getMessageTypeForFormatSchema(schema_info_), with_length_delimiter_, - *reader)) + *reader)) { } @@ -68,19 +87,41 @@ void ProtobufRowInputFormat::syncAfterError() void registerInputFormatProtobuf(FormatFactory & factory) { - for (bool with_length_delimiter : {false, true}) - { - factory.registerInputFormat(with_length_delimiter ? "Protobuf" : "ProtobufSingle", [with_length_delimiter]( - ReadBuffer & buf, - const Block & sample, - IRowInputFormat::Params params, - const FormatSettings & settings) + /// proton: starts + /// for (bool with_length_delimiter : {false, true}) + /// { + /// factory.registerInputFormat( + /// with_length_delimiter ? "Protobuf" : "ProtobufSingle", + /// [with_length_delimiter]( + /// ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) { + /// return std::make_shared( + /// buf, sample, std::move(params), FormatSchemaInfo(settings, "Protobuf", true), with_length_delimiter); + /// }); + /// } + + factory.registerInputFormat( + "ProtobufSingle", + [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) + -> std::shared_ptr { - return std::make_shared(buf, sample, std::move(params), - FormatSchemaInfo(settings, "Protobuf", true), - with_length_delimiter); + if (settings.schema.kafka_schema_registry_url.empty()) + return std::make_shared( + buf, sample, std::move(params), FormatSchemaInfo(settings, "Protobuf", true), /*with_length_delimiter=*/false); + + if (!settings.schema.format_schema.empty()) + throw Exception( + ErrorCodes::INVALID_SETTING_VALUE, "kafka_schema_registry_url and format_schema cannot be used at the same time"); + + return std::make_shared(buf, sample, std::move(params), settings); }); - } + + factory.registerInputFormat( + "Protobuf", [](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings) + { + return std::make_shared( + buf, sample, std::move(params), FormatSchemaInfo(settings, "Protobuf", true), /*with_length_delimiter=*/true); + }); + /// proton: ends } ProtobufSchemaReader::ProtobufSchemaReader(const FormatSettings & format_settings) @@ -100,20 +141,177 @@ NamesAndTypesList ProtobufSchemaReader::readSchema() } /// proton: starts +namespace +{ +using ConfluentSchemaRegistry = ProtobufConfluentRowInputFormat::SchemaRegistryWithCache; + +auto & schemaRegistryCache() +{ + /// Cache of Schema Registry URL + credentials -> SchemaRegistry + static LRUCache schema_registry_cache(/*max_size_=*/1000); + return schema_registry_cache; +} + +std::shared_ptr getConfluentSchemaRegistry(const String & base_url, const String & credentials) +{ + auto [schema_registry, loaded] = schemaRegistryCache().getOrSet( + base_url + credentials, + [&base_url, &credentials]() { return std::make_shared(base_url, credentials); }); + return schema_registry; +} +} + +class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache : public google::protobuf::io::ErrorCollector +{ +public: + SchemaRegistryWithCache(const String & base_url, const String & credentials) : registry(base_url, credentials) { } + + /// Overrides google::protobuf::io::ErrorCollector. + void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override + { + throw Exception(ErrorCodes::INVALID_DATA, "Failed to parse schema, line={}, column={}, message={}", line, column, message); + } + + const google::protobuf::Descriptor * getMessageType(uint32_t schema_id, const std::vector & indexes) + { + assert(!indexes.empty()); + + const auto * fd = getSchema(schema_id); + + /// Check https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + /// for how to get the message with the indexes. + auto mt_count = fd->message_type_count(); + if (mt_count < indexes[0] + 1) + throw Exception(ErrorCodes::INVALID_DATA, "Invalid message index={} max_index={}", indexes[0], mt_count); + + const auto * descriptor = fd->message_type(indexes[0]); + + for (auto i : std::span(indexes.begin() + 1, indexes.end())) + { + if (i > descriptor->nested_type_count()) + throw Exception( + ErrorCodes::INVALID_DATA, + "Invalid message index={} max_index={} descriptor={}", + i, + descriptor->nested_type_count(), + descriptor->name()); + descriptor = descriptor->nested_type(i); + } + + return descriptor; + } + +private: + const google::protobuf::FileDescriptor * getSchema(uint32_t id) + { + const auto * loaded_descriptor = descriptor_pool.FindFileByName(std::to_string(id)); + if (loaded_descriptor) + return loaded_descriptor; + + return fetchSchema(id); + } + + const google::protobuf::FileDescriptor * fetchSchema(uint32_t id) + { + std::lock_guard lock(mutex); + /// Just in case we got beaten + const auto * loaded_descriptor = descriptor_pool.FindFileByName(std::to_string(id)); + if (loaded_descriptor) + return loaded_descriptor; + + auto schema = registry.fetchSchema(id); + google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; + google::protobuf::io::Tokenizer tokenizer(&input, this); + google::protobuf::FileDescriptorProto file_descriptor; + file_descriptor.set_name(std::to_string(id)); + google::protobuf::compiler::Parser parser; + parser.RecordErrorsTo(this); + parser.Parse(&tokenizer, &file_descriptor); + + auto const * descriptor = descriptor_pool.BuildFile(file_descriptor); + if (descriptor && descriptor->message_type_count() > 0) + return descriptor; + + throw Exception(ErrorCodes::INVALID_DATA, "No message type in schema"); + } + + std::mutex mutex; + KafkaSchemaRegistry registry; + google::protobuf::DescriptorPool descriptor_pool; +}; + +ProtobufConfluentRowInputFormat::ProtobufConfluentRowInputFormat( + ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) + : IRowInputFormat(header_, in_, params_, ProcessorID::ProtobufRowInputFormatID) + , registry(getConfluentSchemaRegistry( + format_settings_.schema.kafka_schema_registry_url, format_settings_.schema.kafka_schema_registry_credentials)) +{ +} + +bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & row_read_extension) +{ + if (in->eof()) + return false; + + auto schema_id = KafkaSchemaRegistry::readSchemaId(*in); + + + Int64 indexes_count = 0; + readVarInt(indexes_count, *in); + + std::vector indexes; + indexes.reserve(indexes_count < 1 ? 1 : indexes_count); + + if (indexes_count < 1) + { + indexes.push_back(0); + } + else + { + for (Int64 i = 0; i < indexes_count; i++) + { + Int64 ind = 0; + readVarInt(ind, *in); + indexes.push_back(ind); + } + } + + const auto & header = getPort().getHeader(); + + ProtobufReader reader{*in}; + serializer = ProtobufSerializer::create( + header.getNames(), + header.getDataTypes(), + missing_column_indices, + *registry->getMessageType(schema_id, indexes), + /*with_length_delimiter=*/false, + reader); + + size_t row_num = columns.empty() ? 0 : columns[0]->size(); + if (!row_num) + serializer->setColumns(columns.data(), columns.size()); + + serializer->readRow(row_num); + + assert(row_read_extension.read_columns.empty()); + if (!missing_column_indices.empty()) + { + row_read_extension.read_columns.resize(columns.size(), true); + for (size_t column_idx : missing_column_indices) + row_read_extension.read_columns[column_idx] = false; + } + return true; +} + ProtobufSchemaWriter::ProtobufSchemaWriter(std::string_view schema_body_, const FormatSettings & settings_) : IExternalSchemaWriter(schema_body_, settings_) - , schema_info( - settings.schema.format_schema, - "Protobuf", - false, - settings.schema.is_server, - settings.schema.format_schema_path) + , schema_info(settings.schema.format_schema, "Protobuf", false, settings.schema.is_server, settings.schema.format_schema_path) { } -SchemaValidationErrors ProtobufSchemaWriter::validate() +void ProtobufSchemaWriter::validate() { - return ProtobufSchemas::instance().validateSchema(schema_body); + ProtobufSchemas::instance().validateSchema(schema_body); } bool ProtobufSchemaWriter::write(bool replace_if_exist) @@ -122,7 +320,7 @@ bool ProtobufSchemaWriter::write(bool replace_if_exist) if (!replace_if_exist) return false; - WriteBufferFromFile write_buffer {schema_info.absoluteSchemaPath()}; + WriteBufferFromFile write_buffer{schema_info.absoluteSchemaPath()}; write_buffer.write(schema_body.data(), schema_body.size()); return true; } @@ -130,24 +328,19 @@ bool ProtobufSchemaWriter::write(bool replace_if_exist) void registerProtobufSchemaReader(FormatFactory & factory) { - factory.registerExternalSchemaReader("Protobuf", [](const FormatSettings & settings) - { - return std::make_shared(settings); - }); + factory.registerExternalSchemaReader( + "Protobuf", [](const FormatSettings & settings) { return std::make_shared(settings); }); factory.registerFileExtension("pb", "Protobuf"); /// proton: starts factory.registerSchemaFileExtension("proto", "Protobuf"); - factory.registerExternalSchemaWriter("Protobuf", [](std::string_view schema_body, const FormatSettings & settings) - { + factory.registerExternalSchemaWriter("Protobuf", [](std::string_view schema_body, const FormatSettings & settings) { return std::make_shared(schema_body, settings); }); /// proton: ends - factory.registerExternalSchemaReader("ProtobufSingle", [](const FormatSettings & settings) - { - return std::make_shared(settings); - }); + factory.registerExternalSchemaReader( + "ProtobufSingle", [](const FormatSettings & settings) { return std::make_shared(settings); }); for (const auto & name : {"Protobuf", "ProtobufSingle"}) factory.registerAdditionalInfoForSchemaCacheGetter( @@ -161,9 +354,13 @@ void registerProtobufSchemaReader(FormatFactory & factory) namespace DB { class FormatFactory; -void registerInputFormatProtobuf(FormatFactory &) {} +void registerInputFormatProtobuf(FormatFactory &) +{ +} -void registerProtobufSchemaReader(FormatFactory &) {} +void registerProtobufSchemaReader(FormatFactory &) +{ +} } #endif diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h index 907129881a4..9dc81b4afee 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h @@ -8,6 +8,10 @@ # include # include +/// proton: starts +# include +/// proton: ends + namespace DB { class Block; @@ -30,7 +34,12 @@ class ProtobufSerializer; class ProtobufRowInputFormat final : public IRowInputFormat { public: - ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, const Params & params_, const FormatSchemaInfo & schema_info_, bool with_length_delimiter_); + ProtobufRowInputFormat( + ReadBuffer & in_, + const Block & header_, + const Params & params_, + const FormatSchemaInfo & schema_info_, + bool with_length_delimiter_); ~ProtobufRowInputFormat() override; String getName() const override { return "ProtobufRowInputFormat"; } @@ -61,12 +70,36 @@ class ProtobufSchemaReader : public IExternalSchemaReader }; /// proton: starts +/// Confluent framing + Protobuf binary datum encoding. Mainly used for Kafka. +/// Uses 3 caches: +/// 1. global: schema registry cache (base_url + credentials -> SchemaRegistry) +/// 2. SchemaRegistry: schema cache (schema_id -> schema) +/// 3. ProtobufConfluentRowInputFormat: deserializer cache (schema_id -> AvroDeserializer) +/// This is needed because KafkaStorage creates a new instance of InputFormat per a batch of messages +class ProtobufConfluentRowInputFormat final : public IRowInputFormat +{ +public: + ProtobufConfluentRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_); + String getName() const override { return "ProtobufConfluentRowInputFormat"; } + + // void setReadBuffer(ReadBuffer & buf) override; + + class SchemaRegistryWithCache; + +private: + bool readRow(MutableColumns & columns, RowReadExtension & row_read_extension) override; + + std::shared_ptr registry; + std::vector missing_column_indices; + std::unique_ptr serializer; +}; + class ProtobufSchemaWriter : public IExternalSchemaWriter { public: explicit ProtobufSchemaWriter(std::string_view schema_body_, const FormatSettings & settings_); - SchemaValidationErrors validate() override; + void validate() override; bool write(bool replace_if_exist) override; private: diff --git a/tests/stream/test_stream_smoke/0033_format_schema.yaml b/tests/stream/test_stream_smoke/0033_format_schema.yaml index 83ae747529b..59995f93964 100644 --- a/tests/stream/test_stream_smoke/0033_format_schema.yaml +++ b/tests/stream/test_stream_smoke/0033_format_schema.yaml @@ -144,13 +144,13 @@ tests: expected_results: - query_id: fmt-2-0 # unknown format type - expected_results: error_code:73 + expected_results: error_code:2613 - query_id: fmt-2-1 # empty body expected_results: error_code:36 - query_id: fmt-2-2 # invalid schema - expected_results: error_code:2504 + expected_results: error_code:434 - query_id: fmt-2-3 # nonexists expected_results: error_code:2611