From d61ed6aa9828e445870a64c44cdf1c4a2dcfdd4d Mon Sep 17 00:00:00 2001 From: fredoboulo Date: Sat, 16 Mar 2019 22:17:08 +0100 Subject: [PATCH] Proposal of implementations for read/write serialization. ZNeedle within a ZFrame, ZPicture within a ZMsg --- src/main/java/org/zeromq/ZMQ.java | 451 +----------------- src/main/java/org/zeromq/proto/ZNeedle.java | 224 +++++++++ src/main/java/org/zeromq/proto/ZPicture.java | 431 +++++++++++++++++ .../java/org/zeromq/proto/package-info.java | 4 + .../mechanism/curve/CurveClientMechanism.java | 4 +- .../mechanism/curve/CurveServerMechanism.java | 6 +- src/main/java/zmq/socket/reqrep/Req.java | 2 +- src/main/java/zmq/util/Utils.java | 8 +- src/main/java/zmq/util/Wire.java | 126 ++++- src/test/java/org/zeromq/TestZMQ.java | 71 +-- .../java/org/zeromq/proto/ZNeedleTest.java | 234 +++++++++ .../java/org/zeromq/proto/ZPictureTest.java | 174 +++++++ src/test/java/zmq/util/WireTest.java | 58 +++ 13 files changed, 1291 insertions(+), 502 deletions(-) create mode 100644 src/main/java/org/zeromq/proto/ZNeedle.java create mode 100644 src/main/java/org/zeromq/proto/ZPicture.java create mode 100644 src/main/java/org/zeromq/proto/package-info.java create mode 100644 src/test/java/org/zeromq/proto/ZNeedleTest.java create mode 100644 src/test/java/org/zeromq/proto/ZPictureTest.java diff --git a/src/main/java/org/zeromq/ZMQ.java b/src/main/java/org/zeromq/ZMQ.java index c25c6c90f..af0006a72 100644 --- a/src/main/java/org/zeromq/ZMQ.java +++ b/src/main/java/org/zeromq/ZMQ.java @@ -11,9 +11,10 @@ import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import org.zeromq.proto.ZPicture; + import zmq.Ctx; import zmq.SocketBase; import zmq.ZError; @@ -429,8 +430,7 @@ public enum Error EPROTONOSUPPORT(ZError.EPROTONOSUPPORT), ENOBUFS(ZError.ENOBUFS), ENETDOWN(ZError.ENETDOWN), - EADDRINUSE(ZError.EADDRINUSE) - { + EADDRINUSE(ZError.EADDRINUSE) { @Override public String getMessage() { @@ -441,32 +441,28 @@ public String getMessage() ECONNREFUSED(ZError.ECONNREFUSED), EINPROGRESS(ZError.EINPROGRESS), EHOSTUNREACH(ZError.EHOSTUNREACH), - EMTHREAD(ZError.EMTHREAD) - { + EMTHREAD(ZError.EMTHREAD) { @Override public String getMessage() { return "No thread available"; } }, - EFSM(ZError.EFSM) - { + EFSM(ZError.EFSM) { @Override public String getMessage() { return "Operation cannot be accomplished in current state"; } }, - ENOCOMPATPROTO(ZError.ENOCOMPATPROTO) - { + ENOCOMPATPROTO(ZError.ENOCOMPATPROTO) { @Override public String getMessage() { return "The protocol is not compatible with the socket type"; } }, - ETERM(ZError.ETERM) - { + ETERM(ZError.ETERM) { @Override public String getMessage() { @@ -495,8 +491,7 @@ public String getMessage() EPROTO(ZError.EPROTO); private static final Map map = new HashMap<>(Error.values().length); - static - { + static { for (Error e : Error.values()) { map.put(e.code, e); } @@ -814,121 +809,6 @@ public static class Socket implements Closeable private final SocketBase base; private final AtomicBoolean isClosed = new AtomicBoolean(false); - // Network data encoding macros - - // Put a 1-byte number to the frame - private static void putNumber1(ByteBuffer needle, int value) - { - needle.put((byte) value); - } - - // Get a 1-byte number to the frame - // then make it unsigned - private static byte getNumber1(ByteBuffer needle) - { - int value = needle.get(); - if (value < 0) { - value = (0xff) & value; - } - return (byte) value; - } - - // Put a 2-byte number to the frame - private static void putNumber2(ByteBuffer needle, int value) - { - needle.putShort((short) value); - } - - // Get a 2-byte number to the frame - private static int getNumber2(ByteBuffer needle) - { - int value = needle.getShort(); - if (value < 0) { - value = (0xffff) & value; - } - return value; - } - - // Put a 4-byte number to the frame - private static void putNumber4(ByteBuffer needle, long value) - { - needle.putInt((int) value); - } - - // Get a 4-byte number to the frame - // then make it unsigned - private static long getNumber4(ByteBuffer needle) - { - long value = needle.getInt(); - if (value < 0) { - value = (0xffffffff) & value; - } - return value; - } - - // Put a 8-byte number to the frame - private static void putNumber8(ByteBuffer needle, long value) - { - needle.putLong(value); - } - - // Get a 8-byte number to the frame - private static long getNumber8(ByteBuffer needle) - { - return needle.getLong(); - } - - // Put a block to the frame - private static void putBlock(ByteBuffer needle, byte[] value, int size) - { - needle.put(value, 0, size); - } - - // Get a block from the frame - private static byte[] getBlock(ByteBuffer needle, int size) - { - byte[] value = new byte[size]; - needle.get(value); - - return value; - } - - // Put a string to the frame - private static void putString(ByteBuffer needle, String value) - { - byte[] bytes = value.getBytes(ZMQ.CHARSET); - needle.put((byte) bytes.length); - needle.put(bytes); - } - - // Get a string from the frame - private static String getString(ByteBuffer needle) - { - int size = getNumber1(needle); - byte[] value = new byte[size]; - needle.get(value); - - return new String(value, ZMQ.CHARSET); - } - - // Put a string to the frame - private static void putLongString(ByteBuffer needle, String value) - { - byte[] bytes = value.getBytes(ZMQ.CHARSET); - needle.putInt(bytes.length); - needle.put(bytes); - } - - // Get a string from the frame - private static String getLongString(ByteBuffer needle) - { - long size = getNumber4(needle); - byte[] value = new byte[(int) size]; - needle.get(value); - - return new String(value, ZMQ.CHARSET); - } - /** * Class constructor. * @@ -3415,66 +3295,9 @@ public int sendByteBuffer(ByteBuffer data, int flags) @Draft public boolean sendPicture(String picture, Object... args) { - ZMsg msg = new ZMsg(); - for (int pictureIndex = 0, argIndex = 0; pictureIndex < picture.length(); pictureIndex++, argIndex++) { - char pattern = picture.charAt(pictureIndex); - switch (pattern) { - case 'i': { - msg.add(String.format("%d", (int) args[argIndex])); - break; - } - case '1': { - msg.add(String.format("%d", (0xff) & (int) args[argIndex])); - break; - } - case '2': { - msg.add(String.format("%d", (0xffff) & (int) args[argIndex])); - break; - } - case '4': { - msg.add(String.format("%d", (0xffffffff) & (long) args[argIndex])); - break; - } - case '8': { - msg.add(String.format("%d", (long) args[argIndex])); - break; - } - case 's': { - msg.add((String) args[argIndex]); - break; - } - case 'b': { - msg.add((byte[]) args[argIndex]); - break; - } - case 'f': { - msg.add((ZFrame) args[argIndex]); - break; - } - case 'm': { - if (pictureIndex != picture.length() - 1) { - throw new ZMQException("'m' (ZMsg) only valid at end of picture", ZError.EPROTO); - } - ZMsg msgParm = (ZMsg) args[argIndex]; - while (msgParm.size() > 0) { - msg.add(msgParm.pop()); - } - break; - } - case 'z': { - msg.add((byte[]) null); - argIndex--; - break; - } - default: - throw new ZMQException("invalid picture element '" + pattern + "'", ZError.EPROTO); - } - } - return msg.send(this, false); + return new ZPicture().sendPicture(this, picture, args); } - private static final int BINARY_PICTURE_SEND_MAX_FRAMES = 32; // Arbitrary limit, for now - /** * Queues a binary encoded 'picture' message to the socket (or actor), so it can be sent. * This method is similar to {@link #sendPicture(String, Object...)}, except the arguments @@ -3504,137 +3327,7 @@ public boolean sendPicture(String picture, Object... args) @Draft public boolean sendBinaryPicture(String picture, Object... args) { - // Pass 1: calculate total size of data frame - int frameSize = 0; - ZFrame[] frames = new ZFrame[BINARY_PICTURE_SEND_MAX_FRAMES]; - final AtomicInteger nbrFrames = new AtomicInteger(0); // Size of this table - - for (int index = 0; index < picture.length(); index++) { - char pattern = picture.charAt(index); - switch (pattern) { - case '1': { - frameSize += 1; - break; - } - case '2': { - frameSize += 2; - break; - } - case '4': { - frameSize += 4; - break; - } - case '8': { - frameSize += 8; - break; - } - case 's': { - String string = (String) args[index]; - frameSize += 1 + (string != null ? string.getBytes(ZMQ.CHARSET).length : 0); - break; - } - case 'S': { - String string = (String) args[index]; - frameSize += 4 + (string != null ? string.getBytes(ZMQ.CHARSET).length : 0); - break; - } - case 'c': { - byte[] block = (byte[]) args[index]; - frameSize += 4 + block.length; - break; - } - case 'f': { - ZFrame frame = (ZFrame) args[index]; - if (nbrFrames.get() > BINARY_PICTURE_SEND_MAX_FRAMES) { - throw new ZMQException("Max no of frames exceeded", ZError.EPROTO); - } - - frames[nbrFrames.getAndIncrement()] = frame; - break; - } - case 'm': { - if (index != picture.length() - 1) { - throw new ZMQException("'m' (ZMsg) only valid at end of picture", ZError.EPROTO); - } - - ZMsg msg = (ZMsg) args[index]; - if (msg != null) { - msg.forEach(frame -> { - if (nbrFrames.get() > BINARY_PICTURE_SEND_MAX_FRAMES) { - throw new ZMQException("Max no of frames exceeded", ZError.EPROTO); - } - - frames[nbrFrames.getAndIncrement()] = frame; - }); - } - else { - frames[nbrFrames.getAndIncrement()] = new ZFrame(); - } - - break; - } - default: - throw new ZMQException("invalid picture element '" + pattern + "'", ZError.EPROTO); - } - } - - // Pass 2: encode data into data frame - ByteBuffer needle = ByteBuffer.allocate(frameSize); - for (int index = 0; index < picture.length(); index++) { - char pattern = picture.charAt(index); - switch (pattern) { - case '1': { - putNumber1(needle, (int) args[index]); - break; - } - case '2': { - putNumber2(needle, (int) args[index]); - break; - } - case '4': { - putNumber4(needle, (long) args[index]); - break; - } - case '8': { - putNumber8(needle, (long) args[index]); - break; - } - case 's': { - putString(needle, (String) args[index]); - break; - } - case 'S': { - putLongString(needle, (String) args[index]); - break; - } - case 'c': { - byte[] block = (byte[]) args[index]; - putNumber4(needle, block.length); - putBlock(needle, block, block.length); - break; - } - case 'f': - case 'm': - break; - default: - throw new ZMQException("invalid picture element '" + pattern + "'", ZError.EPROTO); - } - } - - // Now send the data frame - needle.flip(); - boolean rc = sendByteBuffer(needle, nbrFrames.get() > 0 ? zmq.ZMQ.ZMQ_SNDMORE : 0) > 0; - if (rc) { - // Now send any additional frames - for (int frameNbr = 0; frameNbr < nbrFrames.get(); frameNbr++) { - boolean more = frameNbr < nbrFrames.get() - 1; - boolean success = frames[frameNbr].sendAndDestroy(this, more ? zmq.ZMQ.ZMQ_SNDMORE : 0); - if (!rc) { - break; - } - } - } - return rc; + return new ZPicture().sendBinaryPicture(this, picture, args); } /** @@ -3819,68 +3512,9 @@ public String recvStr(int flags) @Draft public Object[] recvPicture(String picture) { - Object[] elements = new Object[picture.length()]; - for (int index = 0; index < picture.length(); index++) { - char pattern = picture.charAt(index); - switch (pattern) { - case 'i': { - elements[index] = Integer.valueOf(recvStr()); - break; - } - case '1': { - elements[index] = (0xff) & Integer.valueOf(recvStr()); - break; - } - case '2': { - elements[index] = (0xffff) & Integer.valueOf(recvStr()); - break; - } - case '4': { - elements[index] = (0xffffffff) & Long.valueOf(recvStr()); - break; - } - case '8': { - elements[index] = Long.valueOf(recvStr()); - break; - } - case 's': { - elements[index] = recvStr(); - break; - } - case 'b': { - elements[index] = recv(); - break; - } - case 'f': { - elements[index] = ZFrame.recvFrame(this); - - break; - } - case 'm': { - if (index != picture.length() - 1) { - throw new ZMQException("'m' (ZMsg) only valid at end of picture", ZError.EPROTO); - } - elements[index] = ZMsg.recvMsg(this); - break; - } - case 'z': { - ZFrame zeroFrame = ZFrame.recvFrame(this); - if (zeroFrame == null || zeroFrame.size() > 0) { - throw new ZMQException("zero frame is not empty", ZError.EPROTO); - } - elements[index] = new ZFrame(); - break; - } - default: - throw new ZMQException("invalid picture element '" + pattern + "'", ZError.EPROTO); - } - } - return elements; + return new ZPicture().recvPicture(this, picture); } - // This is the largest size we allow for an incoming longstr or chunk (1M) - private static final int BINARY_PICTURE_RECV_MAX_ALLOC_SIZE = 1024 * 1024; - /** * Receive a binary encoded 'picture' message from the socket (or actor). * This method is similar to {@link #recv()}, except the arguments are encoded @@ -3895,68 +3529,7 @@ public Object[] recvPicture(String picture) @Draft public Object[] recvBinaryPicture(final String picture) { - // Get the data frame - final ByteBuffer needle = ByteBuffer.wrap(recv()); - if (needle == null) { - return null; - } - - Object[] results = new Object[picture.length()]; - for (int index = 0; index < picture.length(); index++) { - char pattern = picture.charAt(index); - switch (pattern) { - case '1': { - results[index] = getNumber1(needle); - break; - } - case '2': { - results[index] = getNumber2(needle); - break; - } - case '4': { - results[index] = getNumber4(needle); - break; - } - case '8': { - results[index] = getNumber8(needle); - break; - } - case 's': { - results[index] = getString(needle); - break; - } - case 'S': { - results[index] = getLongString(needle); - break; - } - case 'c': { - int blockSize = (int) getNumber4(needle); - if (blockSize > BINARY_PICTURE_RECV_MAX_ALLOC_SIZE) { - throw new ZMQException("block size " + blockSize + "larger than the maximum " + BINARY_PICTURE_RECV_MAX_ALLOC_SIZE, - ZError.EMSGSIZE); - } - results[index] = getBlock(needle, blockSize); - break; - } - case 'f': { - // Get next frame off socket - results[index] = ZFrame.recvFrame(this); - break; - } - case 'm': { - if (index != picture.length() - 1) { - throw new ZMQException("'m' (ZMsg) only valid at end of picture", ZError.EPROTO); - } - - // Get zero or more remaining frames - results[index] = ZMsg.recvMsg(this); - break; - } - default: - throw new ZMQException("invalid picture element '" + pattern + "'", ZError.EPROTO); - } - } - return results; + return new ZPicture().recvBinaryPicture(this, picture); } /** diff --git a/src/main/java/org/zeromq/proto/ZNeedle.java b/src/main/java/org/zeromq/proto/ZNeedle.java new file mode 100644 index 000000000..114cf86d2 --- /dev/null +++ b/src/main/java/org/zeromq/proto/ZNeedle.java @@ -0,0 +1,224 @@ +package org.zeromq.proto; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.zeromq.ZFrame; + +import zmq.util.Draft; +import zmq.util.Utils; +import zmq.util.Wire; + +/** + * Needle for de/serialization of data within a frame. + * + * This is a DRAFT class, and may change without notice. + */ +@Draft +public final class ZNeedle +{ + private final ByteBuffer needle; // Read/write pointer for serialization + + public ZNeedle(ZFrame frame) + { + this(frame.getData()); + } + + private ZNeedle(byte[] data) + { + needle = ByteBuffer.wrap(data); + } + + private void checkAvailable(int size) + { + Utils.checkArgument(needle.position() + size <= needle.limit(), () -> "Unable to handle " + size + " bytes"); + } + + private void forward(int size) + { + needle.position(needle.position() + size); + } + + private T get(BiFunction getter, int size) + { + T value = getter.apply(needle, needle.position()); + forward(size); + return value; + } + + // Put a 1-byte number to the frame + public void putNumber1(int value) + { + checkAvailable(1); + needle.put((byte) (value & 0xff)); + } + + // Get a 1-byte number to the frame + // then make it unsigned + public int getNumber1() + { + checkAvailable(1); + int value = needle.get(needle.position()) & 0xff; + forward(1); + return value; + } + + // Put a 2-byte number to the frame + public void putNumber2(int value) + { + checkAvailable(2); + Wire.putUInt16(needle, value); + } + + // Get a 2-byte number to the frame + public int getNumber2() + { + checkAvailable(2); + return get(Wire::getUInt16, 2); + } + + // Put a 4-byte number to the frame + public void putNumber4(int value) + { + checkAvailable(4); + Wire.putUInt32(needle, value); + } + + // Get a 4-byte number to the frame + // then make it unsigned + public int getNumber4() + { + checkAvailable(4); + return get(Wire::getUInt32, 4); + } + + // Put a 8-byte number to the frame + public void putNumber8(long value) + { + checkAvailable(8); + Wire.putUInt64(needle, value); + } + + // Get a 8-byte number to the frame + public long getNumber8() + { + checkAvailable(8); + return get(Wire::getUInt64, 8); + } + + // Put a block to the frame + public void putBlock(byte[] value, int size) + { + needle.put(value, 0, size); + } + + public byte[] getBlock(int size) + { + checkAvailable(size); + byte[] value = new byte[size]; + needle.get(value); + + return value; + } + + // Put a string to the frame + public void putShortString(String value) + { + checkAvailable(value.length() + 1); + Wire.putShortString(needle, value); + } + + // Get a string from the frame + public String getShortString() + { + String value = Wire.getShortString(needle, needle.position()); + forward(value.length() + 1); + return value; + } + + public void putLongString(String value) + { + checkAvailable(value.length() + 4); + Wire.putLongString(needle, value); + } + + // Get a long string from the frame + public String getLongString() + { + String value = Wire.getLongString(needle, needle.position()); + forward(value.length() + 4); + return value; + } + + // Put a string to the frame + public void putString(String value) + { + if (value.length() > Byte.MAX_VALUE * 2 + 1) { + putLongString(value); + } + else { + putShortString(value); + } + } + + // Get a short string from the frame + public String getString() + { + return getShortString(); + } + + // Put a collection of strings to the frame + public void put(Collection elements) + { + if (elements == null) { + putNumber1(0); + } + else { + Utils.checkArgument(elements.size() < 256, "Collection has to be smaller than 256 elements"); + putNumber1(elements.size()); + elements.stream().forEach(this::putString); + } + } + + public List getList() + { + int size = getNumber1(); + return IntStream.range(0, size).mapToObj(idx -> getString()).collect(Collectors.toList()); + } + + // Put a map of strings to the frame + public void put(Map map) + { + if (map == null) { + putNumber1(0); + } + else { + Utils.checkArgument(map.size() < 256, "Map has to be smaller than 256 elements"); + putNumber1(map.size()); + Utils.checkArgument(map.keySet().stream().noneMatch(s -> s.contains("=")), "Keys cannot contain '=' sign"); + Utils.checkArgument(map.values().stream().noneMatch(s -> s.contains("=")), "Keys cannot contain '=' sign"); + map.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()).forEach(this::putString); + } + } + + public Map getMap() + { + int size = getNumber1(); + return IntStream.range(0, size).mapToObj(idx -> { + String[] kv = getString().split("="); + assert (kv.length == 2); + return kv; + }).collect(Collectors.toMap(kv -> kv[0], kv -> kv[1])); + } + + @Override + public String toString() + { + return "ZNeedle [position=" + needle.position() + ", ceiling=" + needle.limit() + "]"; + } +} diff --git a/src/main/java/org/zeromq/proto/ZPicture.java b/src/main/java/org/zeromq/proto/ZPicture.java new file mode 100644 index 000000000..f2ab02eed --- /dev/null +++ b/src/main/java/org/zeromq/proto/ZPicture.java @@ -0,0 +1,431 @@ +package org.zeromq.proto; + +import java.util.regex.Pattern; + +import org.zeromq.ZFrame; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.Socket; +import org.zeromq.ZMQException; +import org.zeromq.ZMsg; + +import zmq.ZError; +import zmq.util.Draft; + +/** + * De/serialization of data within a message. + * + * This is a DRAFT class, and may change without notice. + */ +@Draft +public class ZPicture +{ + private static final Pattern FORMAT = Pattern.compile("[i1248sbcfz]*m?"); + private static final Pattern BINARY_FORMAT = Pattern.compile("[1248sSbcf]*m?"); + + /** + * Creates a binary encoded 'picture' message to the socket (or actor), so it can be sent. + * The arguments are encoded in a binary format that is compatible with zproto, and + * is designed to reduce memory allocations. + * + * @param picture The picture argument is a string that defines the + * type of each argument. Supports these argument types: + * + * + * + * + * + * + * + * + * + * + * + * + * + *
patternjava typezproto type
1inttype = "number" size = "1"
2inttype = "number" size = "2"
4longtype = "number" size = "3"
8longtype = "number" size = "4"
sString, 0-255 charstype = "string"
SString, 0-2^32-1 charstype = "longstr"
bbyte[], 0-2^32-1 bytestype = "chunk"
cbyte[], 0-2^32-1 bytestype = "chunk"
fZFrametype = "frame"
mZMsgtype = "msg" Has to be the last element of the picture
+ * @param args Arguments according to the picture + * @return true when it has been queued on the socket and ØMQ has assumed responsibility for the message. + * This does not indicate that the message has been transmitted to the network. + * @apiNote Does not change or take ownership of any arguments. + */ + @Draft + public ZMsg msgBinaryPicture(String picture, Object... args) + { + if (!BINARY_FORMAT.matcher(picture).matches()) { + throw new ZMQException(picture + " is not in expected binary format " + BINARY_FORMAT.pattern(), + ZError.EPROTO); + } + ZMsg msg = new ZMsg(); + + // Pass 1: calculate total size of data frame + int frameSize = 0; + for (int index = 0; index < picture.length(); index++) { + char pattern = picture.charAt(index); + switch (pattern) { + case '1': { + frameSize += 1; + break; + } + case '2': { + frameSize += 2; + break; + } + case '4': { + frameSize += 4; + break; + } + case '8': { + frameSize += 8; + break; + } + case 's': { + String string = (String) args[index]; + frameSize += 1 + (string != null ? string.getBytes(ZMQ.CHARSET).length : 0); + break; + } + case 'S': { + String string = (String) args[index]; + frameSize += 4 + (string != null ? string.getBytes(ZMQ.CHARSET).length : 0); + break; + } + case 'b': + case 'c': { + byte[] block = (byte[]) args[index]; + frameSize += 4 + block.length; + break; + } + case 'f': { + ZFrame frame = (ZFrame) args[index]; + msg.add(frame); + break; + } + case 'm': { + ZMsg other = (ZMsg) args[index]; + if (other == null) { + msg.add(new ZFrame((byte[]) null)); + } + else { + other.forEach(msg::add); + } + break; + } + default: + assert (false) : "invalid picture element '" + pattern + "'"; + } + } + + // Pass 2: encode data into data frame + ZFrame frame = new ZFrame(new byte[frameSize]); + ZNeedle needle = new ZNeedle(frame); + for (int index = 0; index < picture.length(); index++) { + char pattern = picture.charAt(index); + switch (pattern) { + case '1': { + needle.putNumber1((int) args[index]); + break; + } + case '2': { + needle.putNumber2((int) args[index]); + break; + } + case '4': { + needle.putNumber4((int) args[index]); + break; + } + case '8': { + needle.putNumber8((long) args[index]); + break; + } + case 's': { + needle.putString((String) args[index]); + break; + } + case 'S': { + needle.putLongString((String) args[index]); + break; + } + case 'b': + case 'c': { + byte[] block = (byte[]) args[index]; + needle.putNumber4(block.length); + needle.putBlock(block, block.length); + break; + } + case 'f': + case 'm': + break; + default: + assert (false) : "invalid picture element '" + pattern + "'"; + } + } + msg.addFirst(frame); + return msg; + } + + @Draft + public boolean sendBinaryPicture(Socket socket, String picture, Object... args) + { + return msgBinaryPicture(picture, args).send(socket); + } + + /** + * Receive a binary encoded 'picture' message from the socket (or actor). + * This method is similar to {@link org.zeromq.ZMQ.Socket#recv()}, except the arguments are encoded + * in a binary format that is compatible with zproto, and is designed to + * reduce memory allocations. + * + * @param picture The picture argument is a string that defines + * the type of each argument. See {@link #sendBinaryPicture(Socket, String, Object...)} + * for the supported argument types. + * @return the picture elements as object array + **/ + @Draft + public Object[] recvBinaryPicture(Socket socket, final String picture) + { + if (!BINARY_FORMAT.matcher(picture).matches()) { + throw new ZMQException(picture + " is not in expected binary format " + BINARY_FORMAT.pattern(), + ZError.EPROTO); + } + ZFrame frame = ZFrame.recvFrame(socket); + if (frame == null) { + return null; + } + // Get the data frame + ZNeedle needle = new ZNeedle(frame); + + Object[] results = new Object[picture.length()]; + for (int index = 0; index < picture.length(); index++) { + char pattern = picture.charAt(index); + switch (pattern) { + case '1': { + results[index] = needle.getNumber1(); + break; + } + case '2': { + results[index] = needle.getNumber2(); + break; + } + case '4': { + results[index] = needle.getNumber4(); + break; + } + case '8': { + results[index] = needle.getNumber8(); + break; + } + case 's': { + results[index] = needle.getString(); + break; + } + case 'S': { + results[index] = needle.getLongString(); + break; + } + case 'b': + case 'c': { + int size = needle.getNumber4(); + results[index] = needle.getBlock(size); + break; + } + case 'f': { + // Get next frame off socket + results[index] = ZFrame.recvFrame(socket); + break; + } + case 'm': { + // Get zero or more remaining frames + results[index] = ZMsg.recvMsg(socket); + break; + } + default: + assert (false) : "invalid picture element '" + pattern + "'"; + } + } + return results; + } + + /** + * Queues a 'picture' message to the socket (or actor), so it can be sent. + * + * @param picture The picture is a string that defines the type of each frame. + * This makes it easy to send a complex multiframe message in + * one call. The picture can contain any of these characters, + * each corresponding to zero or one arguments: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
i = int (stores signed integer)
1 = byte (stores 8-bit unsigned integer)
2 = int (stores 16-bit unsigned integer)
4 = long (stores 32-bit unsigned integer)
8 = long (stores 64-bit unsigned integer)
s = String
b = byte[]
c = byte[]
f = ZFrame
m = ZMsg (sends all frames in the ZMsg)Has to be the last element of the picture
z = sends zero-sized frame (0 arguments)
+ * Note that s, b, f and m are encoded the same way and the choice is + * offered as a convenience to the sender, which may or may not already + * have data in a ZFrame or ZMsg. Does not change or take ownership of + * any arguments. + * + * Also see {@link #recvPicture(Socket, String)}} how to recv a + * multiframe picture. + * @param args Arguments according to the picture + * @return true if successful, false if sending failed for any reason + */ + @Draft + public boolean sendPicture(Socket socket, String picture, Object... args) + { + if (!FORMAT.matcher(picture).matches()) { + throw new ZMQException(picture + " is not in expected format " + FORMAT.pattern(), ZError.EPROTO); + } + ZMsg msg = new ZMsg(); + for (int pictureIndex = 0, argIndex = 0; pictureIndex < picture.length(); pictureIndex++, argIndex++) { + char pattern = picture.charAt(pictureIndex); + switch (pattern) { + case 'i': { + msg.add(String.format("%d", (int) args[argIndex])); + break; + } + case '1': { + msg.add(String.format("%d", (0xff) & (int) args[argIndex])); + break; + } + case '2': { + msg.add(String.format("%d", (0xffff) & (int) args[argIndex])); + break; + } + case '4': { + msg.add(String.format("%d", (0xffffffff) & (int) args[argIndex])); + break; + } + case '8': { + msg.add(String.format("%d", (long) args[argIndex])); + break; + } + case 's': { + msg.add((String) args[argIndex]); + break; + } + case 'b': + case 'c': { + msg.add((byte[]) args[argIndex]); + break; + } + case 'f': { + msg.add((ZFrame) args[argIndex]); + break; + } + case 'm': { + ZMsg msgParm = (ZMsg) args[argIndex]; + while (msgParm.size() > 0) { + msg.add(msgParm.pop()); + } + break; + } + case 'z': { + msg.add((byte[]) null); + argIndex--; + break; + } + default: + assert (false) : "invalid picture element '" + pattern + "'"; + } + } + return msg.send(socket, false); + } + + /** + * Receive a 'picture' message to the socket (or actor). + * + * + * @param picture The picture is a string that defines the type of each frame. + * This makes it easy to recv a complex multiframe message in + * one call. The picture can contain any of these characters, + * each corresponding to zero or one elements in the result: + * + * + * + * + * + * + * + * + * + * + * + * + * + *
i = int (stores signed integer)
1 = int (stores 8-bit unsigned integer)
2 = int (stores 16-bit unsigned integer)
4 = long (stores 32-bit unsigned integer)
8 = long (stores 64-bit unsigned integer)
s = String
b = byte[]
f = ZFrame (creates zframe)
m = ZMsg (creates a zmsg with the remaing frames)
z = null, asserts empty frame (0 arguments)
+ * + * Also see {@link #sendPicture(Socket, String, Object...)} how to send a + * multiframe picture. + * + * @return the picture elements as object array + */ + @Draft + public Object[] recvPicture(Socket socket, String picture) + { + if (!FORMAT.matcher(picture).matches()) { + throw new ZMQException(picture + " is not in expected format " + FORMAT.pattern(), ZError.EPROTO); + } + Object[] elements = new Object[picture.length()]; + for (int index = 0; index < picture.length(); index++) { + char pattern = picture.charAt(index); + switch (pattern) { + case 'i': { + elements[index] = Integer.valueOf(socket.recvStr()); + break; + } + case '1': { + elements[index] = (0xff) & Integer.valueOf(socket.recvStr()); + break; + } + case '2': { + elements[index] = (0xffff) & Integer.valueOf(socket.recvStr()); + break; + } + case '4': { + elements[index] = (0xffffffff) & Integer.valueOf(socket.recvStr()); + break; + } + case '8': { + elements[index] = Long.valueOf(socket.recvStr()); + break; + } + case 's': { + elements[index] = socket.recvStr(); + break; + } + case 'b': + case 'c': { + elements[index] = socket.recv(); + break; + } + case 'f': { + elements[index] = ZFrame.recvFrame(socket); + + break; + } + case 'm': { + elements[index] = ZMsg.recvMsg(socket); + break; + } + case 'z': { + ZFrame zeroFrame = ZFrame.recvFrame(socket); + if (zeroFrame == null || zeroFrame.size() > 0) { + throw new ZMQException("zero frame is not empty", ZError.EPROTO); + } + elements[index] = new ZFrame((byte[]) null); + break; + } + default: + assert (false) : "invalid picture element '" + pattern + "'"; + } + } + return elements; + } +} diff --git a/src/main/java/org/zeromq/proto/package-info.java b/src/main/java/org/zeromq/proto/package-info.java new file mode 100644 index 000000000..6c0e86d37 --- /dev/null +++ b/src/main/java/org/zeromq/proto/package-info.java @@ -0,0 +1,4 @@ +/** + *

Provides utility classes for ØMQ zproto.

+ */ +package org.zeromq.proto; diff --git a/src/main/java/zmq/io/mechanism/curve/CurveClientMechanism.java b/src/main/java/zmq/io/mechanism/curve/CurveClientMechanism.java index 730289002..baa05f3a4 100644 --- a/src/main/java/zmq/io/mechanism/curve/CurveClientMechanism.java +++ b/src/main/java/zmq/io/mechanism/curve/CurveClientMechanism.java @@ -176,7 +176,7 @@ public Msg decode(Msg msg) messageNonce.put("CurveZMQMESSAGES".getBytes(ZMQ.CHARSET)); msg.transfer(messageNonce, 8, 8); - long nonce = Wire.getUInt64(msg, 8); + long nonce = msg.getLong(8); if (nonce <= cnPeerNonce) { errno.set(ZError.EPROTO); @@ -383,7 +383,7 @@ private int processReady(Msg msg) readyNonce.put("CurveZMQREADY---".getBytes(ZMQ.CHARSET)); msg.transfer(readyNonce, 6, 8); - cnPeerNonce = Wire.getUInt64(msg, 6); + cnPeerNonce = msg.getLong(6); int rc = cryptoBox.openAfternm(readyPlaintext, readyBox, clen, readyNonce, cnPrecom); if (rc != 0) { diff --git a/src/main/java/zmq/io/mechanism/curve/CurveServerMechanism.java b/src/main/java/zmq/io/mechanism/curve/CurveServerMechanism.java index 214c026d5..bef2d16c1 100644 --- a/src/main/java/zmq/io/mechanism/curve/CurveServerMechanism.java +++ b/src/main/java/zmq/io/mechanism/curve/CurveServerMechanism.java @@ -176,7 +176,7 @@ public Msg decode(Msg msg) messageNonce.put("CurveZMQMESSAGEC".getBytes(ZMQ.CHARSET)); msg.transfer(messageNonce, 8, 8); - long nonce = Wire.getUInt64(msg, 8); + long nonce = msg.getLong(8); if (nonce <= cnPeerNonce) { errno.set(ZError.EPROTO); @@ -265,7 +265,7 @@ private int processHello(Msg msg) helloNonce.put("CurveZMQHELLO---".getBytes(ZMQ.CHARSET)); msg.transfer(helloNonce, 112, 8); - cnPeerNonce = Wire.getUInt64(msg, 112); + cnPeerNonce = msg.getLong(112); helloBox.position(Curve.Size.BOXZERO.bytes()); msg.transfer(helloBox, 120, 80); @@ -383,7 +383,7 @@ private int processInitiate(Msg msg) initiateNonce.put("CurveZMQINITIATE".getBytes(ZMQ.CHARSET)); msg.transfer(initiateNonce, 105, 8); - cnPeerNonce = Wire.getUInt64(msg, 105); + cnPeerNonce = msg.getLong(105); rc = cryptoBox.open(initiatePlaintext, initiateBox, clen, initiateNonce, cnClient, cnSecret); if (rc != 0) { diff --git a/src/main/java/zmq/socket/reqrep/Req.java b/src/main/java/zmq/socket/reqrep/Req.java index 6242fefed..7730ef988 100644 --- a/src/main/java/zmq/socket/reqrep/Req.java +++ b/src/main/java/zmq/socket/reqrep/Req.java @@ -138,7 +138,7 @@ protected Msg xrecv() if (msg == null) { return null; } - if (!msg.hasMore() || msg.size() != 4 || Wire.getUInt32(msg, 0) != requestId) { + if (!msg.hasMore() || msg.size() != 4 || msg.getInt(0) != requestId) { // Skip the remaining frames and try the next message while (msg.hasMore()) { msg = recvReplyPipe(); diff --git a/src/main/java/zmq/util/Utils.java b/src/main/java/zmq/util/Utils.java index d0eb1b6a5..5e5f968f0 100644 --- a/src/main/java/zmq/util/Utils.java +++ b/src/main/java/zmq/util/Utils.java @@ -9,6 +9,7 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SocketChannel; import java.security.SecureRandom; +import java.util.function.Supplier; import zmq.io.net.Address; import zmq.io.net.tcp.TcpUtils; @@ -164,9 +165,14 @@ public static String dump(ByteBuffer buffer, int pos, int limit) } public static void checkArgument(boolean expression, String errorMessage) + { + checkArgument(expression, () -> errorMessage); + } + + public static void checkArgument(boolean expression, Supplier errorMessage) { if (!expression) { - throw new IllegalArgumentException(errorMessage); + throw new IllegalArgumentException(errorMessage.get()); } } } diff --git a/src/main/java/zmq/util/Wire.java b/src/main/java/zmq/util/Wire.java index a1aef3e8f..1d33cb0bd 100644 --- a/src/main/java/zmq/util/Wire.java +++ b/src/main/java/zmq/util/Wire.java @@ -1,6 +1,9 @@ package zmq.util; import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import org.zeromq.ZMQ; import zmq.Msg; @@ -12,35 +15,59 @@ private Wire() { } + private static int getUInt8(ByteBuffer buf, int offset) + { + return buf.get(offset) & 0xff; + } + + private static ByteBuffer putUInt8(ByteBuffer buf, int value) + { + buf.put((byte) (value & 0xff)); + return buf; + } + + // 2 bytes value public static int getUInt16(byte[] bytes) { return (bytes[0] & 0xff) << 8 | bytes[1] & 0xff; } + public static int getUInt16(ByteBuffer buf, int offset) + { + return (buf.get(offset) & 0xff) << 8 | (buf.get(offset + 1) & 0xff); + } + public static byte[] putUInt16(int value) { assert (value >= 0); // it has to be an *unsigned* int byte[] bytes = new byte[2]; bytes[0] = (byte) ((value >>> 8) & 0xff); - bytes[1] = (byte) (value & 0xff); + bytes[1] = (byte) ((value & 0xff)); return bytes; } - public static int getUInt32(ByteBuffer buf) + public static Msg putUInt16(Msg msg, int value) { - return getUInt32(buf, 0); + msg.put((byte) ((value >>> 8) & 0xff)); + msg.put((byte) ((value & 0xff))); + + return msg; } - public static int getUInt32(Msg msg, int offset) + public static ByteBuffer putUInt16(ByteBuffer buf, int value) { - return msg.getInt(offset); + buf.put((byte) ((value >>> 8) & 0xff)); + buf.put((byte) ((value & 0xff))); + + return buf; } - public static int getUInt16(ByteBuffer buf, int offset) + // 4 bytes value + public static int getUInt32(ByteBuffer buf) { - return (buf.get(offset) & 0xff) << 8 | (buf.get(offset + 1) & 0xff); + return getUInt32(buf, 0); } public static int getUInt32(ByteBuffer buf, int offset) @@ -49,18 +76,25 @@ public static int getUInt32(ByteBuffer buf, int offset) | (buf.get(offset + 3) & 0xff); } + public static int getUInt32(Msg msg, int offset) + { + return msg.getInt(offset); + } + public static int getUInt32(byte[] bytes, int offset) { return (bytes[offset] & 0xff) << 24 | (bytes[offset + 1] & 0xff) << 16 | (bytes[offset + 2] & 0xff) << 8 | (bytes[offset + 3] & 0xff); } - public static Msg putUInt16(Msg msg, int value) + public static ByteBuffer putUInt32(ByteBuffer buf, int value) { - msg.put((byte) ((value >>> 8) & 0xff)); - msg.put((byte) ((value & 0xff))); + buf.put((byte) ((value >>> 24) & 0xff)); + buf.put((byte) ((value >>> 16) & 0xff)); + buf.put((byte) ((value >>> 8) & 0xff)); + buf.put((byte) ((value & 0xff))); - return msg; + return buf; } public static byte[] putUInt32(int value) @@ -76,16 +110,6 @@ public static byte[] putUInt32(int value) return bytes; } - public static ByteBuffer putUInt32(ByteBuffer buf, int value) - { - buf.put((byte) ((value >>> 24) & 0xff)); - buf.put((byte) ((value >>> 16) & 0xff)); - buf.put((byte) ((value >>> 8) & 0xff)); - buf.put((byte) ((value & 0xff))); - - return buf; - } - public static Msg putUInt32(Msg msg, int value) { msg.put((byte) ((value >>> 24) & 0xff)); @@ -96,6 +120,7 @@ public static Msg putUInt32(Msg msg, int value) return msg; } + // 8 bytes value public static ByteBuffer putUInt64(ByteBuffer buf, long value) { buf.put((byte) ((value >>> 56) & 0xff)); @@ -122,4 +147,63 @@ public static long getUInt64(Msg msg, int offset) { return msg.getLong(offset); } + + // strings + public static void putShortString(ByteBuffer buf, String value) + { + putShortString(ZMQ.CHARSET, buf, value); + } + + public static String getShortString(ByteBuffer buf, int offset) + { + return getShortString(ZMQ.CHARSET, buf, offset); + } + + public static void putShortString(Charset charset, ByteBuffer buf, String value) + { + int length = value.length(); + Utils.checkArgument(length < 256, "String must be strictly smaller than 256 characters"); + putUInt8(buf, length); + buf.put(value.getBytes(charset)); + } + + public static String getShortString(Charset charset, ByteBuffer buf, int offset) + { + int length = getUInt8(buf, offset); + return extractString(charset, buf, offset, length, 1); + } + + public static void putLongString(ByteBuffer buf, String value) + { + putLongString(ZMQ.CHARSET, buf, value); + } + + public static String getLongString(ByteBuffer buf, int offset) + { + return getLongString(ZMQ.CHARSET, buf, offset); + } + + public static void putLongString(Charset charset, ByteBuffer buf, String value) + { + int length = value.length(); + Utils.checkArgument(length < 0x7fffffff, "String must be smaller than 2^31-1 characters"); + Wire.putUInt32(buf, length); + buf.put(value.getBytes(charset)); + } + + public static String getLongString(Charset charset, ByteBuffer buf, int offset) + { + int length = Wire.getUInt32(buf, offset); + return extractString(charset, buf, offset, length, 4); + } + + private static String extractString(Charset charset, ByteBuffer buf, int offset, int length, int sizeOfSize) + { + byte[] text = new byte[length]; + int old = buf.position(); + buf.position(offset + sizeOfSize); + buf.get(text, 0, length); + buf.position(old); + return new String(text, charset); + } } diff --git a/src/test/java/org/zeromq/TestZMQ.java b/src/test/java/org/zeromq/TestZMQ.java index fc0b01074..d3aa898a3 100644 --- a/src/test/java/org/zeromq/TestZMQ.java +++ b/src/test/java/org/zeromq/TestZMQ.java @@ -1,29 +1,30 @@ package org.zeromq; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Arrays; + import org.junit.After; import org.junit.Before; import org.junit.Test; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; import org.zeromq.ZMQ.Socket.Mechanism; + import zmq.ZError; import zmq.msg.MsgAllocator; import zmq.msg.MsgAllocatorDirect; import zmq.util.Errno; -import java.io.IOException; -import java.util.Arrays; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - public class TestZMQ { private Context ctx; @@ -185,21 +186,21 @@ public void testSocketSendRecvPicture() msg.add("Hello"); msg.add("World"); rc = push.sendPicture( - picture, - 255, - 65535, - 4294967295L, - Long.MAX_VALUE, - "Hello World", - "ABC".getBytes(ZMQ.CHARSET), - new ZFrame("My frame"), - msg); + picture, + 255, + 65535, + 429496729, + Long.MAX_VALUE, + "Hello World", + "ABC".getBytes(ZMQ.CHARSET), + new ZFrame("My frame"), + msg); assertThat(rc, is(true)); Object[] objects = pull.recvPicture(picture); assertThat(objects[0], is(equalTo(255))); assertThat(objects[1], is(equalTo(65535))); - assertThat(objects[2], is(equalTo(4294967295L))); + assertThat(objects[2], is(equalTo(429496729))); assertThat(objects[3], is(equalTo(Long.MAX_VALUE))); assertThat(objects[4], is(equalTo("Hello World"))); assertThat(objects[5], is(equalTo("ABC".getBytes(zmq.ZMQ.CHARSET)))); @@ -235,22 +236,22 @@ public void testSocketSendRecvBinaryPicture() msg.add("Hello"); msg.add("World"); rc = push.sendBinaryPicture( - picture, - 255, - 65535, - 429496729L, - Long.MAX_VALUE, - "Hello World", - "Hello cruel World!", - "ABC".getBytes(ZMQ.CHARSET), - new ZFrame("My frame"), - msg); + picture, + 255, + 65535, + 429496729, + Long.MAX_VALUE, + "Hello World", + "Hello cruel World!", + "ABC".getBytes(ZMQ.CHARSET), + new ZFrame("My frame"), + msg); assertThat(rc, is(true)); Object[] objects = pull.recvBinaryPicture(picture); - assertThat(objects[0], is(equalTo((byte) 255))); + assertThat(objects[0], is(equalTo(255))); assertThat(objects[1], is(equalTo(65535))); - assertThat(objects[2], is(equalTo(429496729L))); + assertThat(objects[2], is(equalTo(429496729))); assertThat(objects[3], is(equalTo(Long.MAX_VALUE))); assertThat(objects[4], is(equalTo("Hello World"))); assertThat(objects[5], is(equalTo("Hello cruel World!"))); diff --git a/src/test/java/org/zeromq/proto/ZNeedleTest.java b/src/test/java/org/zeromq/proto/ZNeedleTest.java new file mode 100644 index 000000000..cdf130479 --- /dev/null +++ b/src/test/java/org/zeromq/proto/ZNeedleTest.java @@ -0,0 +1,234 @@ +package org.zeromq.proto; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import org.junit.Test; +import org.zeromq.ZFrame; +import org.zeromq.ZMQ; + +public class ZNeedleTest +{ + @Test + public void testGetByte() + { + ZFrame frame = new ZFrame(new byte[1]); + ZNeedle needle = new ZNeedle(frame); + assertThat(needle.getNumber1(), is(0)); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetIncorrectByte() + { + ZFrame frame = new ZFrame(new byte[0]); + ZNeedle needle = new ZNeedle(frame); + needle.getNumber1(); + } + + @Test + public void testGetShort() + { + ZFrame frame = new ZFrame(new byte[2]); + ZNeedle needle = new ZNeedle(frame); + assertThat(needle.getNumber2(), is(0)); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetIncorrectShort() + { + ZFrame frame = new ZFrame(new byte[1]); + ZNeedle needle = new ZNeedle(frame); + needle.getNumber2(); + } + + @Test + public void testGetInt() + { + ZFrame frame = new ZFrame(new byte[4]); + ZNeedle needle = new ZNeedle(frame); + assertThat(needle.getNumber4(), is(0)); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetIncorrectInt() + { + ZFrame frame = new ZFrame(new byte[3]); + ZNeedle needle = new ZNeedle(frame); + needle.getNumber4(); + } + + @Test + public void testGetLong() + { + ZFrame frame = new ZFrame(new byte[8]); + ZNeedle needle = new ZNeedle(frame); + assertThat(needle.getNumber8(), is(0L)); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetIncorrectLong() + { + ZFrame frame = new ZFrame(new byte[7]); + ZNeedle needle = new ZNeedle(frame); + needle.getNumber8(); + } + + @Test + public void testSimpleValues() + { + ZFrame frame = new ZFrame(new byte[15 + 26 + 1 + 4 + 8 + 2]); + ZNeedle needle = new ZNeedle(frame); + + needle.putNumber1(1); + needle.putNumber2(124); + needle.putNumber4(3245678); + needle.putNumber8(87654225); + byte[] block = new byte[10]; + Arrays.fill(block, (byte) 2); + needle.putBlock(block, 8); + needle.putShortString("abcdefg"); + needle.putLongString("hijklmnopqrstuvwxyz"); + needle.putNumber2(421); + + needle = new ZNeedle(frame); + assertThat(needle.getNumber1(), is(1)); + assertThat(needle.getNumber2(), is(124)); + assertThat(needle.getNumber4(), is(3245678)); + assertThat(needle.getNumber8(), is(87654225L)); + block = new byte[8]; + Arrays.fill(block, (byte) 2); + assertThat(needle.getBlock(8), is(block)); + assertThat(needle.getShortString(), is("abcdefg")); + assertThat(needle.getLongString(), is("hijklmnopqrstuvwxyz")); + assertThat(needle.getNumber2(), is(421)); + + String s = needle.toString(); + assertThat(s, is("ZNeedle [position=56, ceiling=56]")); + } + + @Test + public void testLongString() + { + ZFrame frame = new ZFrame(new byte[300]); + ZNeedle needle = new ZNeedle(frame); + + byte[] bytes = new byte[294]; + Arrays.fill(bytes, (byte) 'A'); + String string = new String(bytes, ZMQ.CHARSET); + needle.putString(string); + needle.putNumber2(42); + + needle = new ZNeedle(frame); + assertThat(needle.getLongString(), is(string)); + assertThat(needle.getNumber2(), is(42)); + } + + @Test + public void testList() + { + ZFrame frame = new ZFrame(new byte[26]); + ZNeedle needle = new ZNeedle(frame); + + needle.put(Arrays.asList("1", "2", "34", "567")); + needle.put(Arrays.asList("864", "43", "9", "0")); + + needle = new ZNeedle(frame); + assertThat(needle.getList(), is(Arrays.asList("1", "2", "34", "567"))); + assertThat(needle.getList(), is(Arrays.asList("864", "43", "9", "0"))); + } + + @Test + public void testNullList() + { + ZFrame frame = new ZFrame(new byte[1]); + ZNeedle needle = new ZNeedle(frame); + + needle.put((List) null); + + needle = new ZNeedle(frame); + assertThat(needle.getList(), is(Collections.emptyList())); + } + + @Test + public void testEmptyList() + { + ZFrame frame = new ZFrame(new byte[1]); + ZNeedle needle = new ZNeedle(frame); + + needle.put(Collections.emptyList()); + + needle = new ZNeedle(frame); + assertThat(needle.getList(), is(Collections.emptyList())); + } + + @Test + public void testMap() + { + ZFrame frame = new ZFrame(new byte[2 * (1 + 10 + 4 + 7)]); + ZNeedle needle = new ZNeedle(frame); + + Map map = new HashMap<>(); + map.put("key", "value"); + map.put("1", "2"); + map.put("34", "567"); + needle.put(map); + needle.put(map); + + needle = new ZNeedle(frame); + assertThat(needle.getMap(), is(map)); + assertThat(new HashSet<>(needle.getList()), is(new HashSet<>(Arrays.asList("key=value", "1=2", "34=567")))); + } + + @Test + public void testNullMap() + { + ZFrame frame = new ZFrame(new byte[1]); + ZNeedle needle = new ZNeedle(frame); + + needle.put((Map) null); + + needle = new ZNeedle(frame); + assertThat(needle.getMap(), is(Collections.emptyMap())); + } + + @Test + public void testEmptyMap() + { + ZFrame frame = new ZFrame(new byte[1]); + ZNeedle needle = new ZNeedle(frame); + + needle.put(Collections.emptyMap()); + + needle = new ZNeedle(frame); + assertThat(needle.getMap(), is(Collections.emptyMap())); + } + + @Test(expected = IllegalArgumentException.class) + public void testMapIncorrectKey() + { + ZFrame frame = new ZFrame(new byte[(1 + 10)]); + ZNeedle needle = new ZNeedle(frame); + + Map map = new HashMap<>(); + map.put("ke=", "value"); + needle.put(map); + } + + @Test(expected = IllegalArgumentException.class) + public void testMapIncorrectValue() + { + ZFrame frame = new ZFrame(new byte[(1 + 10)]); + ZNeedle needle = new ZNeedle(frame); + + Map map = new HashMap<>(); + map.put("key", "=alue"); + needle.put(map); + } +} diff --git a/src/test/java/org/zeromq/proto/ZPictureTest.java b/src/test/java/org/zeromq/proto/ZPictureTest.java new file mode 100644 index 000000000..cc5678d26 --- /dev/null +++ b/src/test/java/org/zeromq/proto/ZPictureTest.java @@ -0,0 +1,174 @@ +package org.zeromq.proto; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.zeromq.SocketType; +import org.zeromq.ZFrame; +import org.zeromq.ZMQ; +import org.zeromq.ZMQ.Context; +import org.zeromq.ZMQ.Socket; +import org.zeromq.ZMQException; +import org.zeromq.ZMsg; + +public class ZPictureTest +{ + private final ZPicture pic = new ZPicture(); + + @Test(expected = ZMQException.class) + public void testInvalidBinaryPictureFormat() + { + String picture = "a"; + pic.msgBinaryPicture(picture, 255); + } + + @Test(expected = ZMQException.class) + public void testSendInvalidPictureFormat() + { + String picture = " "; + pic.sendPicture(null, picture, 255); + } + + @Test(expected = ZMQException.class) + public void testReceiveInvalidPictureFormat() + { + String picture = "x"; + pic.recvPicture(null, picture); + } + + @Test(expected = ZMQException.class) + public void testInvalidPictureMsgNotInTheEnd() + { + String picture = "m1"; + + ZMsg msg = new ZMsg().push("Hello"); + pic.msgBinaryPicture(picture, msg, 255); + } + + @Test(expected = ZMQException.class) + public void testReceiveInvalidPictureMsgNotInTheEnd() + { + String picture = "m1"; + pic.recvBinaryPicture(null, picture); + } + + @Test + public void testValidPictureNullMsgInTheEnd() + { + String picture = "fm"; + + ZMsg msg = pic.msgBinaryPicture(picture, new ZFrame("My frame"), null); + assertThat(msg.getLast().size(), is(0)); + } + + @Test + public void testSocketSendRecvPicture() + { + Context context = ZMQ.context(1); + + Socket push = context.socket(SocketType.PUSH); + Socket pull = context.socket(SocketType.PULL); + + boolean rc = pull.setReceiveTimeOut(50); + assertThat(rc, is(true)); + int port = push.bindToRandomPort("tcp://127.0.0.1"); + rc = pull.connect("tcp://127.0.0.1:" + port); + assertThat(rc, is(true)); + + String picture = "i1248sbcfzm"; + + ZMsg msg = new ZMsg(); + msg.add("Hello"); + msg.add("World"); + rc = pic.sendPicture( + push, + picture, + -456, + 255, + 65535, + 429496729, + Long.MAX_VALUE, + "Hello World", + "ABC".getBytes(ZMQ.CHARSET), + "DEF".getBytes(ZMQ.CHARSET), + new ZFrame("My frame"), + msg); + assertThat(rc, is(true)); + + Object[] objects = pic.recvPicture(pull, picture); + assertThat(objects[0], is(-456)); + assertThat(objects[1], is(255)); + assertThat(objects[2], is(65535)); + assertThat(objects[3], is(429496729)); + assertThat(objects[4], is(Long.MAX_VALUE)); + assertThat(objects[5], is("Hello World")); + assertThat(objects[6], is("ABC".getBytes(zmq.ZMQ.CHARSET))); + assertThat(objects[7], is("DEF".getBytes(zmq.ZMQ.CHARSET))); + assertThat(objects[8], is(equalTo(new ZFrame("My frame")))); + assertThat(objects[9], is(equalTo(new ZFrame((byte[]) null)))); + ZMsg expectedMsg = new ZMsg(); + expectedMsg.add("Hello"); + expectedMsg.add("World"); + assertThat(objects[10], is(equalTo(expectedMsg))); + + push.close(); + pull.close(); + context.term(); + } + + @Test + public void testSocketSendRecvBinaryPicture() + { + Context context = ZMQ.context(1); + + Socket push = context.socket(SocketType.PUSH); + Socket pull = context.socket(SocketType.PULL); + + boolean rc = pull.setReceiveTimeOut(50); + assertThat(rc, is(true)); + int port = push.bindToRandomPort("tcp://127.0.0.1"); + rc = pull.connect("tcp://127.0.0.1:" + port); + assertThat(rc, is(true)); + + String picture = "1248sSbcfm"; + + ZMsg msg = new ZMsg(); + msg.add("Hello"); + msg.add("World"); + rc = pic.sendBinaryPicture( + push, + picture, + 255, + 65535, + 429496729, + Long.MAX_VALUE, + "Hello World", + "Hello cruel World!", + "ABC".getBytes(ZMQ.CHARSET), + "DEF".getBytes(ZMQ.CHARSET), + new ZFrame("My frame"), + msg); + assertThat(rc, is(true)); + + Object[] objects = pic.recvBinaryPicture(pull, picture); + assertThat(objects[0], is(255)); + assertThat(objects[1], is(65535)); + assertThat(objects[2], is(429496729)); + assertThat(objects[3], is(Long.MAX_VALUE)); + assertThat(objects[4], is("Hello World")); + assertThat(objects[5], is("Hello cruel World!")); + assertThat(objects[6], is("ABC".getBytes(zmq.ZMQ.CHARSET))); + assertThat(objects[7], is("DEF".getBytes(zmq.ZMQ.CHARSET))); + assertThat(objects[8], is(equalTo(new ZFrame("My frame")))); + ZMsg expectedMsg = new ZMsg(); + expectedMsg.add("Hello"); + expectedMsg.add("World"); + assertThat(objects[9], is(equalTo(expectedMsg))); + + push.close(); + pull.close(); + context.term(); + } +} diff --git a/src/test/java/zmq/util/WireTest.java b/src/test/java/zmq/util/WireTest.java index 68f7af246..6b864c76c 100644 --- a/src/test/java/zmq/util/WireTest.java +++ b/src/test/java/zmq/util/WireTest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertThat; import java.nio.ByteBuffer; +import java.util.Arrays; import org.junit.Test; @@ -87,4 +88,61 @@ private void testUnsignedLong(long expected) long actual = Wire.getUInt64(buf, 0); assertThat(actual, is(expected)); } + + @Test + public void testShortString() + { + testShortString("abcdef", 0); + testShortString("ghijklm", 3); + } + + @Test + public void testShortStringOutOfByteRange() + { + testShortString(string(Byte.MAX_VALUE + 1, 'C'), 0); + } + + @Test + public void testShortStringMax() + { + testShortString(string(255, 'C'), 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testShortStringTooLong() + { + testShortString(string(256, 'B'), 0); + } + + @Test + public void testLongString() + { + testLongString("abcdef", 1); + testLongString(string(256, 'D'), 0); + } + + private void testShortString(String expected, int offset) + { + ByteBuffer buf = ByteBuffer.allocate(expected.length() + 1 + offset); + buf.position(offset); + Wire.putShortString(buf, expected); + String actual = Wire.getShortString(buf, offset); + assertThat(actual, is(expected)); + } + + private void testLongString(String expected, int offset) + { + ByteBuffer buf = ByteBuffer.allocate(expected.length() + 4 + offset); + buf.position(offset); + Wire.putLongString(buf, expected); + String actual = Wire.getLongString(buf, offset); + assertThat(actual, is(expected)); + } + + private String string(int length, char letter) + { + byte[] content = new byte[length]; + Arrays.fill(content, (byte) letter); + return new String(content); + } }