Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: support kafka schema registry for Protobuf and Avro. #579

Merged
merged 16 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ if [ -z "$sanitizer" ]; then
fi

cmake .. \
-DCMAKE_BUILD_TYPE=${build_type} \
-DCMAKE_BUILD_TYPE="$build_type" \
-DENABLE_PROTON_ALL=OFF \
-DENABLE_PROTON_SERVER=ON \
-DENABLE_PROTON_CLIENT=ON \
Expand Down Expand Up @@ -73,5 +73,6 @@ cmake .. \
-DENABLE_SNOWFLAKE_FUNCS=ON \
-DENABLE_ENCRYPT_DECRYPT_FUNCS=ON \
-DENABLE_DEBUG_FUNCS=ON \
-DENABLE_URL_FUNCS=ON
-DENABLE_URL_FUNCS=ON \
-DENABLE_AVRO=ON

1 change: 1 addition & 0 deletions docker/packager/packager
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, image_typ
cmake_flags.append('-DENABLE_KRB5=ON')
cmake_flags.append('-DENABLE_BROTLI=ON')
cmake_flags.append('-DENABLE_S3=ON')
cmake_flags.append('-DENABLE_AVRO=ON')

# krb5: Disabled in environments other than Linux and native Darwin.
# Reference: contrib/krb5-cmake/CMakeLists.txt:3
Expand Down
2 changes: 2 additions & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map proton Enum and CapnProto Enum", 0)\
M(String, rawstore_time_extraction_type, "", "_tp_time extraction type (string, json, regex)", 0) \
M(String, rawstore_time_extraction_rule, "", "_tp_time extraction rule (string, json, regex)", 0) \
M(URI, kafka_schema_registry_url, "", "For ProtobufSingle format: Kafka Schema Registry URL.", 0) \
M(String, kafka_schema_registry_credentials, "", "Credetials to be used to fetch schema from the `kafka_schema_registry_url`, with format '<username>:<password>'.", 0) \
/** proton: ends. */
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.
Expand Down
4 changes: 4 additions & 0 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.schema.format_schema = settings.format_schema;
format_settings.schema.format_schema_path = context->getFormatSchemaPath();
format_settings.schema.is_server = context->hasGlobalContext() && (context->getGlobalContext()->getApplicationType() == Context::ApplicationType::SERVER);
/// proton: starts
format_settings.schema.kafka_schema_registry_url = settings.kafka_schema_registry_url.toString();
format_settings.schema.kafka_schema_registry_credentials = settings.kafka_schema_registry_credentials;
/// proton: ends
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;
format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
Expand Down
29 changes: 14 additions & 15 deletions src/Formats/FormatSchemaFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <IO/ReadBufferFromString.h>
#include <Processors/Formats/ISchemaWriter.h>

#include <format>

namespace DB
{

Expand All @@ -18,24 +20,12 @@ extern const int INVALID_DATA;

namespace
{

String basename(const std::filesystem::path & path)
{
return path.filename().replace_extension().string();
}

String formatSchemaValidationErrors(SchemaValidationErrors errors)
{
assert(!errors.empty());
String ret;
for (size_t i = 0; i < errors.size(); ++i)
{
if (i > 0)
ret.append("; ");
ret.append(fmt::format("line: {}, columns: {}, error: {}", errors[i].line, errors[i].col, errors[i].error));
}

return ret;
}
}

void FormatSchemaFactory::registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, ContextPtr & context)
Expand All @@ -53,8 +43,17 @@ void FormatSchemaFactory::registerSchema(const String & schema_name, const Strin
std::lock_guard lock(mutex);
auto writer = FormatFactory::instance().getExternalSchemaWriter(format, schema_body, context, format_settings);
assert(writer); /* confirmed with checkSchemaType */
if (auto errors = writer->validate(); !errors.empty())
throw Exception(ErrorCodes::INVALID_DATA, "Invalid Protobuf schema, errors: {}", formatSchemaValidationErrors(errors));

try
{
writer->validate();
}
catch (DB::Exception & e)
{
e.addMessage(std::format("{} schema {} was invalid", format, schema_name));
e.rethrow();
}


auto result = writer->write(exists_op == ExistsOP::Replace);
if (!result && exists_op == ExistsOP::Throw)
Expand Down
4 changes: 4 additions & 0 deletions src/Formats/FormatSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ struct FormatSettings
std::string format_schema;
std::string format_schema_path;
bool is_server = false;
/// proton: starts
std::string kafka_schema_registry_url;
std::string kafka_schema_registry_credentials;
/// proton: ends
} schema;

struct
Expand Down
115 changes: 115 additions & 0 deletions src/Formats/KafkaSchemaRegistry.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#include <Formats/KafkaSchemaRegistry.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadHelpers.h>
#include <format>

#include <Poco/JSON/Parser.h>
zliang-min marked this conversation as resolved.
Show resolved Hide resolved
#include <Poco/Net/HTTPBasicCredentials.h>
#include <boost/algorithm/string/predicate.hpp>

namespace DB
{

namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}

KafkaSchemaRegistry::KafkaSchemaRegistry(const String & base_url_, const String & credentials_)
: base_url(base_url_)
, logger(&Poco::Logger::get("KafkaSchemaRegistry"))
{
assert(!base_url.empty());

if (auto pos = credentials_.find(':'); pos == credentials_.npos)
credentials.setUsername(credentials_);
else
{
credentials.setUsername(credentials_.substr(0, pos));
credentials.setPassword(credentials_.substr(pos + 1));
}
}

String KafkaSchemaRegistry::fetchSchema(UInt32 id)
{
try
{
try
{
Poco::URI url(base_url, std::format("/schemas/ids/{}", id));
LOG_TRACE(logger, "Fetching schema id = {}", id);

/// One second for connect/send/receive. Just in case.
ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0});

Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(url.getHost());

if (!credentials.empty())
credentials.authenticate(request);

auto session = makePooledHTTPSession(url, timeouts, 1);
std::istream * response_body{};
try
{
session->sendRequest(request);

Poco::Net::HTTPResponse response;
response_body = receiveResponse(*session, request, response, false);
}
catch (const Poco::Exception & e)
{
/// We use session data storage as storage for exception text
/// Depend on it we can deduce to reconnect session or reresolve session host
session->attachSessionData(e.message());
throw;
}
Poco::JSON::Parser parser;
auto json_body = parser.parse(*response_body).extract<Poco::JSON::Object::Ptr>();
auto schema = json_body->getValue<std::string>("schema");
LOG_TRACE(logger, "Successfully fetched schema id = {}\n{}", id, schema);
return schema;
}
catch (const Exception &)
{
throw;
}
catch (const Poco::Exception & e)
{
throw Exception(Exception::CreateFromPocoTag{}, e);
}
}
catch (Exception & e)
{
e.addMessage(std::format("while fetching schema with id {}", id));
throw;
}
}

UInt32 KafkaSchemaRegistry::readSchemaId(ReadBuffer & in)
{
uint8_t magic;
uint32_t schema_id;

try
{
readBinaryBigEndian(magic, in);
readBinaryBigEndian(schema_id, in);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA)
/// empty or incomplete message without magic byte or schema id
throw Exception(ErrorCodes::INCORRECT_DATA, "Missing magic byte or schema identifier.");
else
throw;
}

if (magic != 0x00)
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid magic byte before schema identifier."
" Must be zero byte, found 0x{:x} instead", magic);

return schema_id;
}

}
29 changes: 29 additions & 0 deletions src/Formats/KafkaSchemaRegistry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include <IO/ReadBuffer.h>

#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/URI.h>

namespace DB
{

/// A helper class helps working with Kafka schema registry.
class KafkaSchemaRegistry final
{
public:
static UInt32 readSchemaId(ReadBuffer & in);

/// \param credentials_ is expected to be formatted in "<username>:<password>".
KafkaSchemaRegistry(const String & base_url_, const String & credentials_);

String fetchSchema(UInt32 id);

private:
Poco::URI base_url;
Poco::Net::HTTPBasicCredentials credentials;

Poco::Logger* logger;
};

}
46 changes: 14 additions & 32 deletions src/Formats/ProtobufSchemas.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "config.h"

#if USE_PROTOBUF
# include <Common/Exception.h>
# include <Common/LRUCache.h>
# include <Formats/FormatSchemaInfo.h>
# include <Processors/Formats/ISchemaWriter.h>
# include <Formats/KafkaSchemaRegistry.h>
# include <Formats/ProtobufSchemas.h>
# include <Processors/Formats/ISchemaWriter.h>
# include <google/protobuf/compiler/importer.h>
# include <Common/Exception.h>


namespace DB
Expand All @@ -14,32 +16,7 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
/// proton: starts
extern const int INVALID_DATA;
/// proton: ends
}

/// proton: starts
namespace
{
class ErrorCollector final: public google::protobuf::io::ErrorCollector
{
private:
SchemaValidationErrors errors;

public:
void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override
{
errors.emplace_back(line, column, message);
}

const SchemaValidationErrors & getErrors() const
{
return errors;
}
};
}
/// proton: starts

ProtobufSchemas & ProtobufSchemas::instance()
{
Expand Down Expand Up @@ -106,17 +83,22 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSch
}

/// proton: starts
SchemaValidationErrors ProtobufSchemas::validateSchema(std::string_view schema)
/// Overrides google::protobuf::io::ErrorCollector:
void ProtobufSchemas::AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message)
{
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA,
"Cannot parse schema, found an error at line {}, column {}, error: {}", line, column, message);
}

void ProtobufSchemas::validateSchema(std::string_view schema)
{
google::protobuf::io::ArrayInputStream input{schema.data(), static_cast<int>(schema.size())};
ErrorCollector error_collector;
google::protobuf::io::Tokenizer tokenizer(&input, &error_collector);
google::protobuf::io::Tokenizer tokenizer(&input, this);
google::protobuf::FileDescriptorProto descriptor;
google::protobuf::compiler::Parser parser;

parser.RecordErrorsTo(&error_collector);
parser.RecordErrorsTo(this);
parser.Parse(&tokenizer, &descriptor);
return error_collector.getErrors();
}
/// proton: ends

Expand Down
21 changes: 14 additions & 7 deletions src/Formats/ProtobufSchemas.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
#include <base/types.h>
#include <boost/noncopyable.hpp>

/// proton: starts
#include <Formats/KafkaSchemaRegistry.h>

#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/tokenizer.h>
/// proton: ends


namespace google
{
Expand All @@ -21,28 +28,28 @@ namespace protobuf
namespace DB
{
class FormatSchemaInfo;
/// proton: starts
struct SchemaValidationError;
using SchemaValidationErrors = std::vector<SchemaValidationError>;
/// proton: ends

/** Keeps parsed google protobuf schemas parsed from files.
* This class is used to handle the "Protobuf" input/output formats.
*/
class ProtobufSchemas : private boost::noncopyable
class ProtobufSchemas : public google::protobuf::io::ErrorCollector /* proton: updated */
{
public:
static ProtobufSchemas & instance();

ProtobufSchemas();
~ProtobufSchemas();
~ProtobufSchemas() override; /* proton: updated */

/// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type.
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info);

/// proton: starts
SchemaValidationErrors validateSchema(std::string_view schema);
void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override;

/// Validates the given schema and throw a DB::Exception if the schema is invalid.
/// The exception will contain the first error encountered when validating the schema, i.e. there could be more errors.
void validateSchema(std::string_view schema);
/// proton: ends
private:
class ImporterWithSourceTree;
Expand Down
1 change: 1 addition & 0 deletions src/KafkaLog/KafkaWALSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ struct KafkaWALSettings
settings.push_back(fmt::format("shared_subscription_flush_threshold_bytes={}", shared_subscription_flush_threshold_bytes));
settings.push_back(fmt::format("shared_subscription_flush_threshold_ms={}", shared_subscription_flush_threshold_ms));
settings.push_back(fmt::format("auth.security.protocol={}", auth.security_protocol));
settings.push_back(fmt::format("auth.sasl.mechanism={}", auth.sasl_mechanism));
settings.push_back(fmt::format("auth.username={}", auth.username));
settings.push_back(fmt::format("auth.password={}", auth.password));
settings.push_back(fmt::format("auth.ssl.ca.location={}", auth.ssl_ca_cert_file));
Expand Down
Loading
Loading