Skip to content

Commit

Permalink
fix varint serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
tesseract committed Sep 18, 2023
1 parent 37b0990 commit c8409f2
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 234 deletions.
51 changes: 47 additions & 4 deletions ydb/core/kafka_proxy/kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,27 @@ class TKafkaWritable {
TKafkaWritable& operator<<(const TKafkaRawBytes& val);
TKafkaWritable& operator<<(const TKafkaRawString& val);

void writeUnsignedVarint(TKafkaUint32 val);
void writeVarint(TKafkaInt32 val);
template<class T, typename U = std::make_unsigned_t<T>>
void writeUnsignedVarint(T v) {
static constexpr T Mask = Max<T>() - 0x7F;

U value = v;
while ((value & Mask) != 0L) {
ui8 b = (ui8) ((value & 0x7f) | 0x80);
write((const char*)&b, sizeof(b));
value >>= 7;
}
ui8 b = (ui8) value;
write((const char*)&b, sizeof(b));
}

template<class T, typename U = std::make_unsigned_t<T>>
void writeVarint(T value) {
static constexpr ui8 Shift = (sizeof(T) << 3) - 1;

writeUnsignedVarint<U>((value << 1) ^ (value >> Shift));
}

void write(const char* val, size_t length);

private:
Expand All @@ -351,8 +370,32 @@ class TKafkaReadable {

void read(char* val, size_t length);
char get();
ui32 readUnsignedVarint();
i32 readVarint();

template<class T, typename U = std::make_unsigned_t<T>>
T readUnsignedVarint() {
static constexpr size_t MaxLength = (sizeof(T) << 3) - 4;

U value = 0;
size_t i = 0;
ui8 b;
while (((b = static_cast<ui8>(get())) & 0x80) != 0) {
if (i > MaxLength) {
ythrow yexception() << "illegal varint length";
}
value |= ((U)(b & 0x7f)) << i;
i += 7;
}

value |= ((U)b) << i;
return value;
}

template<class T, typename S = std::make_signed_t<T>, typename U = std::make_unsigned_t<T>>
S readVarint() {
U v = readUnsignedVarint<U>();
return (v >> 1) ^ -static_cast<S>(v & 1);
}

TArrayRef<const char> Bytes(size_t length);

// returns a character from the specified position. The current position does not change.
Expand Down
Loading

0 comments on commit c8409f2

Please sign in to comment.