Skip to content

Commit

Permalink
Avoid un-necessary byte copies when decoding a Vert.x Buffer to a pro…
Browse files Browse the repository at this point in the history
…tobuf object and perform more optimal allocation.

Motivation:

The current implementation of the bridge uses a plain ByteArrayInputStream and then let protobuf parse the message. It turns out there is a KnownLength interface used by the parser to know the actual buffer size to be decoded and optimize the decoding process.

In addition we can use Netty ByteBufInputStream to avoid using ByteArrayInputStream and un-necessary copies.

Changes:

Swap usage of ByteArrayInputStream with a class that extends ByteBufInputStream and implement KnownLength.
  • Loading branch information
vietj committed Mar 5, 2025
1 parent f34a6a8 commit b979b88
Showing 1 changed file with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
package io.vertx.grpcio.common.impl;

import io.grpc.Decompressor;
import io.grpc.KnownLength;
import io.grpc.MethodDescriptor;
import io.vertx.grpc.common.WireFormat;
import io.netty.buffer.ByteBufInputStream;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.buffer.BufferInternal;
import io.vertx.grpc.common.GrpcMessage;
import io.vertx.grpc.common.GrpcMessageDecoder;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

Expand All @@ -30,15 +33,31 @@ public BridgeMessageDecoder(MethodDescriptor.Marshaller<T> marshaller, Decompres
this.decompressor = decompressor;
}

private static class KnownLengthStream extends ByteBufInputStream implements KnownLength {
public KnownLengthStream(Buffer buffer) {
super(((BufferInternal)buffer).getByteBuf(), buffer.length());
}

@Override
public void close() {
try {
super.close();
} catch (IOException ignore) {
}
}
}

@Override
public T decode(GrpcMessage msg) {
if (msg.encoding().equals("identity")) {
return marshaller.parse(new ByteArrayInputStream(msg.payload().getBytes()));
} else {
try (InputStream in = decompressor.decompress(new ByteArrayInputStream(msg.payload().getBytes()))) {
return marshaller.parse(in);
} catch (IOException e) {
throw new RuntimeException(e);
try (KnownLengthStream kls = new KnownLengthStream(msg.payload())) {
if (msg.encoding().equals("identity")) {
return marshaller.parse(kls);
} else {
try (InputStream in = decompressor.decompress(kls)) {
return marshaller.parse(in);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
Expand Down

0 comments on commit b979b88

Please sign in to comment.