Skip to content

Commit

Permalink
fixed format
Browse files Browse the repository at this point in the history
  • Loading branch information
zliang-min committed Feb 26, 2024
1 parent 3797768 commit 90a0ae7
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
6 changes: 3 additions & 3 deletions src/Processors/Formats/Impl/AvroRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class InputStreamReadBufferAdapter : public avro::InputStream
*data = reinterpret_cast<const uint8_t *>(in.position());
*len = in.available();

in.position() += in.available();
in.position() += in.available();
return true;
}

Expand Down Expand Up @@ -827,12 +827,12 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_, in_, params_, ProcessorID::AvroConfluentRowInputFormatID)
, schema_registry(getConfluentSchemaRegistry(format_settings_))
/// , input_stream(std::make_unique<InputStreamReadBufferAdapter>(*in)) /* proton: updates */
/// , input_stream(std::make_unique<InputStreamReadBufferAdapter>(*in)) /* proton: updated */
, decoder(avro::binaryDecoder())
, format_settings(format_settings_)

{
/// decoder->init(*input_stream); /* proton: updates */
/// decoder->init(*input_stream); /* proton: updated */
}

bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext)
Expand Down
4 changes: 2 additions & 2 deletions src/Processors/Formats/Impl/AvroRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class AvroConfluentRowInputFormat final : public IRowInputFormat
class SchemaRegistry;

private:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override; /* proton: updates */
bool readRow(MutableColumns & columns, RowReadExtension & ext) override; /* proton: updated */

bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
Expand All @@ -155,7 +155,7 @@ class AvroConfluentRowInputFormat final : public IRowInputFormat
std::unordered_map<SchemaId, AvroDeserializer> deserializer_cache;
const AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id);

/// avro::InputStreamPtr input_stream; /* proton: updates */
/// avro::InputStreamPtr input_stream; /* proton: updated */
avro::DecoderPtr decoder;
FormatSettings format_settings;
};
Expand Down
36 changes: 20 additions & 16 deletions src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
#include "ProtobufRowInputFormat.h"

#if USE_PROTOBUF
# include <Core/Block.h>
# include <Formats/FormatFactory.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/ProtobufReader.h>
# include <Formats/ProtobufSchemas.h>
# include <Formats/ProtobufSerializer.h>
# include <IO/WriteBufferFromFile.h>
# include <Interpreters/Context.h>
# include <base/range.h>
# include <Core/Block.h>
# include <Formats/FormatFactory.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/ProtobufReader.h>
# include <Formats/ProtobufSchemas.h>
# include <Formats/ProtobufSerializer.h>
# include <Interpreters/Context.h>
# include <IO/WriteBufferFromFile.h>
# include <base/range.h>

/// proton: starts
# include <Common/LRUCache.h>
# include <Formats/KafkaSchemaRegistry.h>
# include <IO/VarInt.h>
# include <google/protobuf/compiler/parser.h>
# include <google/protobuf/descriptor.pb.h>
# include <google/protobuf/io/tokenizer.h>
# include <Common/LRUCache.h>
# include <Formats/KafkaSchemaRegistry.h>
# include <IO/VarInt.h>
# include <google/protobuf/compiler/parser.h>
# include <google/protobuf/descriptor.pb.h>
# include <google/protobuf/io/tokenizer.h>
/// proton: ends

namespace DB
Expand Down Expand Up @@ -124,7 +124,11 @@ void registerInputFormatProtobuf(FormatFactory & factory){

ProtobufSchemaReader::ProtobufSchemaReader(const FormatSettings & format_settings)
: schema_info(
format_settings.schema.format_schema, "Protobuf", true, format_settings.schema.is_server, format_settings.schema.format_schema_path)
format_settings.schema.format_schema,
"Protobuf",
true,
format_settings.schema.is_server,
format_settings.schema.format_schema_path)
{
}

Expand Down

0 comments on commit 90a0ae7

Please sign in to comment.