Skip to content

Commit

Permalink
Merge pull request #311 from markif/#310
Browse files Browse the repository at this point in the history
Some performance improvements based on insights from #308.
  • Loading branch information
c-rack committed Dec 24, 2015
2 parents 8decfa3 + 58a13f7 commit 0a15850
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 33 deletions.
23 changes: 17 additions & 6 deletions src/main/java/zmq/Decoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class Decoder extends DecoderBase
private static final int MESSAGE_READY = 3;

private final byte[] tmpbuf;
private final ByteBuffer tmpbufWrap;
private Msg inProgress;
private final long maxmsgsize;
private IMsgSink msgSink;
Expand All @@ -49,9 +50,11 @@ public Decoder(int bufsize, long maxmsgsize)
super(bufsize);
this.maxmsgsize = maxmsgsize;
tmpbuf = new byte[8];
tmpbufWrap = ByteBuffer.wrap(tmpbuf);
tmpbufWrap.limit(1);

// At the beginning, read one byte and go to oneByteSizeReady state.
nextStep(tmpbuf, 1, ONE_BYTE_SIZE_READY);
nextStep(tmpbufWrap, ONE_BYTE_SIZE_READY);
}

// Set the receiver of decoded messages.
Expand Down Expand Up @@ -83,9 +86,11 @@ private boolean oneByteSizeReady()
// First byte of size is read. If it is 0xff(-1 for java byte) read 8-byte size.
// Otherwise allocate the buffer for message data and read the
// message data into it.
tmpbufWrap.position(0);
byte first = tmpbuf[0];
if (first == -1) {
nextStep(tmpbuf, 8, EIGHT_BYTE_SIZE_READY);
tmpbufWrap.limit(8);
nextStep(tmpbufWrap, EIGHT_BYTE_SIZE_READY);
}
else {
// There has to be at least one byte (the flags) in the message).
Expand All @@ -111,7 +116,8 @@ private boolean oneByteSizeReady()
inProgress = getMsgAllocator().allocate(size - 1);
}

nextStep(tmpbuf, 1, FLAGS_READY);
tmpbufWrap.limit(1);
nextStep(tmpbufWrap, FLAGS_READY);
}
return true;

Expand All @@ -121,7 +127,9 @@ private boolean eightByteSizeReady()
{
// 8-byte payload length is read. Allocate the buffer
// for message body and read the message data into it.
final long payloadLength = ByteBuffer.wrap(tmpbuf).getLong();
tmpbufWrap.position(0);
tmpbufWrap.limit(8);
final long payloadLength = tmpbufWrap.getLong(0);

// There has to be at least one byte (the flags) in the message).
if (payloadLength <= 0) {
Expand All @@ -147,7 +155,8 @@ private boolean eightByteSizeReady()
// message and thus we can treat it as uninitialized...
inProgress = getMsgAllocator().allocate(msgSize);

nextStep(tmpbuf, 1, FLAGS_READY);
tmpbufWrap.limit(1);
nextStep(tmpbufWrap, FLAGS_READY);

return true;
}
Expand Down Expand Up @@ -181,7 +190,9 @@ private boolean messageReady()
return false;
}

nextStep(tmpbuf, 1, ONE_BYTE_SIZE_READY);
tmpbufWrap.position(0);
tmpbufWrap.limit(1);
nextStep(tmpbufWrap, ONE_BYTE_SIZE_READY);

return true;
}
Expand Down
17 changes: 10 additions & 7 deletions src/main/java/zmq/Encoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ public class Encoder extends EncoderBase

private Msg inProgress;
private final byte[] tmpbuf;
private final ByteBuffer tmpbufWrap;
private IMsgSource msgSource;

public Encoder(int bufsize)
{
super(bufsize);
tmpbuf = new byte[10];
tmpbufWrap = ByteBuffer.wrap(tmpbuf);
// Write 0 bytes to the batch and go to messageReady state.
nextStep((byte[]) null, 0, MESSAGE_READY, true);
}
Expand Down Expand Up @@ -93,18 +95,19 @@ private boolean messageReady()
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
// message size. In both cases 'flags' field follows.

tmpbufWrap.position(0);
if (size < 255) {
tmpbufWrap.limit(2);
tmpbuf[0] = (byte) size;
tmpbuf[1] = (byte) (inProgress.flags() & Msg.MORE);
nextStep(tmpbuf, 2, SIZE_READY, false);
nextStep(tmpbufWrap, SIZE_READY, false);
}
else {
ByteBuffer b = ByteBuffer.wrap(tmpbuf);
b.put(0, (byte) 0xff);
b.putLong(1, size);
b.put(9, (byte) (inProgress.flags() & Msg.MORE));
nextStep(b, SIZE_READY, false);
tmpbufWrap.limit(10);
tmpbuf[0] = (byte) 0xff;
tmpbufWrap.putLong(1, size);
tmpbuf[9] = (byte) (inProgress.flags() & Msg.MORE);
nextStep(tmpbufWrap, SIZE_READY, false);
}

return true;
Expand Down
30 changes: 19 additions & 11 deletions src/main/java/zmq/EncoderBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,26 @@ public Transfer getData(ByteBuffer buffer)
// other engines running in the same I/O thread for excessive
// amounts of time.
if (this.buffer.position() == 0 && toWrite >= bufferSize) {
Transfer t;
writeBuf.position(writePos);
writeBuf.limit(writePos + toWrite);
t = new Transfer.ByteBufferTransfer(writeBuf);
Transfer t = new Transfer.ByteBufferTransfer(writeBuf);
writePos = 0;
toWrite = 0;

return t;
}

// Copy data to the buffer. If the buffer is full, return.
int toCopy = Math.min(toWrite, buffer.remaining());
if (toCopy > 0) {
writeBuf.position(writePos);
writeBuf.limit(writePos + toCopy);
int remaining = buffer.remaining();
if (toWrite <= remaining) {
buffer.put(writeBuf);
writePos += toCopy;
toWrite -= toCopy;
writePos = 0;
toWrite = 0;
}
else {
writeBuf.limit(writePos + remaining);
buffer.put(writeBuf);
writePos += remaining;
toWrite -= remaining;
writeBuf.limit(writePos + toWrite);
}
}

Expand Down Expand Up @@ -169,7 +171,13 @@ protected void nextStep(Msg msg, int state, boolean beginning)
protected void nextStep(byte[] buf, int toWrite,
int next, boolean beginning)
{
writeBuf = buf != null ? ByteBuffer.wrap(buf) : null;
if (buf != null) {
writeBuf = ByteBuffer.wrap(buf);
writeBuf.limit(toWrite);
}
else {
writeBuf = null;
}
writeChannel = null;
writePos = 0;
this.toWrite = toWrite;
Expand Down
20 changes: 15 additions & 5 deletions src/main/java/zmq/V1Decoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class V1Decoder extends DecoderBase
private static final int MESSAGE_READY = 3;

private final byte[] tmpbuf;
private final ByteBuffer tmpbufWrap;
private Msg inProgress;
private IMsgSink msgSink;
private final long maxmsgsize;
Expand All @@ -42,9 +43,11 @@ public V1Decoder(int bufsize, long maxmsgsize, IMsgSink session)
msgSink = session;

tmpbuf = new byte[8];
tmpbufWrap = ByteBuffer.wrap(tmpbuf);
tmpbufWrap.limit(1);

// At the beginning, read one byte and go to ONE_BYTE_SIZE_READY state.
nextStep(tmpbuf, 1, FLAGS_READY);
nextStep(tmpbufWrap, FLAGS_READY);
}

// Set the receiver of decoded messages.
Expand Down Expand Up @@ -102,7 +105,9 @@ private boolean eightByteSizeReady()
{
// The payload size is encoded as 64-bit unsigned integer.
// The most significant byte comes first.
final long msgSize = ByteBuffer.wrap(tmpbuf).getLong();
tmpbufWrap.position(0);
tmpbufWrap.limit(8);
final long msgSize = tmpbufWrap.getLong(0);

// Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0) {
Expand Down Expand Up @@ -141,11 +146,14 @@ private boolean flagsReady()

// The payload length is either one or eight bytes,
// depending on whether the 'large' bit is set.
tmpbufWrap.position(0);
if ((first & V1Protocol.LARGE_FLAG) > 0) {
nextStep(tmpbuf, 8, EIGHT_BYTE_SIZE_READY);
tmpbufWrap.limit(8);
nextStep(tmpbufWrap, EIGHT_BYTE_SIZE_READY);
}
else {
nextStep(tmpbuf, 1, ONE_BYTE_SIZE_READY);
tmpbufWrap.limit(1);
nextStep(tmpbufWrap, ONE_BYTE_SIZE_READY);
}

return true;
Expand All @@ -169,7 +177,9 @@ private boolean messageReady()
return false;
}

nextStep(tmpbuf, 1, FLAGS_READY);
tmpbufWrap.position(0);
tmpbufWrap.limit(1);
nextStep(tmpbufWrap, FLAGS_READY);

return true;
}
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/zmq/V1Encoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ public class V1Encoder extends EncoderBase

private Msg inProgress;
private final byte[] tmpbuf;
private final ByteBuffer tmpbufWrap;
private IMsgSource msgSource;

public V1Encoder(int bufsize, IMsgSource session)
{
super(bufsize);
tmpbuf = new byte[9];
tmpbufWrap = ByteBuffer.wrap(tmpbuf);
msgSource = session;

// Write 0 bytes to the batch and go to messageReady state.
Expand Down Expand Up @@ -97,14 +99,16 @@ private boolean messageReady()
// the length is encoded as 8-bit unsigned integer. For larger
// messages, 64-bit unsigned integer in network byte order is used.
final int size = inProgress.size();
tmpbufWrap.position(0);
if (size > 255) {
ByteBuffer b = ByteBuffer.wrap(tmpbuf);
b.putLong(1, size);
nextStep(b, SIZE_READY, false);
tmpbufWrap.limit(9);
tmpbufWrap.putLong(1, size);
nextStep(tmpbufWrap, SIZE_READY, false);
}
else {
tmpbufWrap.limit(2);
tmpbuf[1] = (byte) (size);
nextStep(tmpbuf, 2, SIZE_READY, false);
nextStep(tmpbufWrap, SIZE_READY, false);
}
return true;
}
Expand Down

0 comments on commit 0a15850

Please sign in to comment.