Skip to content

Commit

Permalink
Trie and MTrie improvements and increased coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
fredoboulo committed Sep 11, 2017
1 parent 4a0d226 commit 225a521
Show file tree
Hide file tree
Showing 8 changed files with 530 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/main/java/zmq/SocketBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public abstract class SocketBase extends Own implements IPollEvents, Pipe.IPipeEvents
{
private class EndpointPipe
private static class EndpointPipe
{
private final Own endpoint;
private final Pipe pipe;
Expand Down
27 changes: 14 additions & 13 deletions src/main/java/zmq/socket/pubsub/Mtrie.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.HashSet;
import java.util.Set;

import zmq.Msg;
import zmq.pipe.Pipe;
import zmq.util.Utils;

Expand Down Expand Up @@ -32,19 +33,19 @@ public Mtrie()
next = null;
}

public boolean add(byte[] prefix, Pipe pipe)
final boolean addOnTop(Pipe pipe)
{
return addHelper(prefix, 0, 0, pipe);
return addHelper(null, 0, 0, pipe);
}

// Add key to the trie. Returns true if it's a new subscription
// rather than a duplicate.
public boolean add(byte[] prefix, int size, Pipe pipe)
public boolean add(Msg msg, Pipe pipe)
{
return addHelper(prefix, 1, size - 1, pipe);
return addHelper(msg, 1, msg.size() - 1, pipe);
}

private boolean addHelper(byte[] prefix, int start, int size, Pipe pipe)
private boolean addHelper(Msg msg, int start, int size, Pipe pipe)
{
// We are at the node corresponding to the prefix. We are done.
if (size == 0) {
Expand All @@ -56,7 +57,7 @@ private boolean addHelper(byte[] prefix, int start, int size, Pipe pipe)
return result;
}

byte c = prefix[start];
byte c = msg.get(start);
if (c < min || c >= min + count) {
// The character is out of range of currently handled
// characters. We have to extend the table.
Expand Down Expand Up @@ -94,15 +95,15 @@ else if (min < c) {
++liveNodes;
//alloc_assert (next.node);
}
return next[0].addHelper(prefix, start + 1, size - 1, pipe);
return next[0].addHelper(msg, start + 1, size - 1, pipe);
}
else {
if (next[c - min] == null) {
next[c - min] = new Mtrie();
++liveNodes;
//alloc_assert (next.table [c - min]);
}
return next[c - min].addHelper(prefix, start + 1, size - 1, pipe);
return next[c - min].addHelper(msg, start + 1, size - 1, pipe);
}
}

Expand Down Expand Up @@ -233,12 +234,12 @@ else if (newMin > min || newMax < min + count - 1) {

// Remove specific subscription from the trie. Return true is it was
// actually removed rather than de-duplicated.
public boolean rm(byte[] prefix, int size, Pipe pipe)
public boolean rm(Msg msg, Pipe pipe)
{
return rmHelper(prefix, 1, size - 1, pipe);
return rmHelper(msg, 1, msg.size() - 1, pipe);
}

private boolean rmHelper(byte[] prefix, int start, int size, Pipe pipe)
private boolean rmHelper(Msg msg, int start, int size, Pipe pipe)
{
if (size == 0) {
if (pipes != null) {
Expand All @@ -251,7 +252,7 @@ private boolean rmHelper(byte[] prefix, int start, int size, Pipe pipe)
return pipes == null;
}

byte c = prefix[start];
byte c = msg.get(start);
if (count == 0 || c < min || c >= min + count) {
return false;
}
Expand All @@ -262,7 +263,7 @@ private boolean rmHelper(byte[] prefix, int start, int size, Pipe pipe)
return false;
}

boolean ret = nextNode.rmHelper(prefix, start + 1, size - 1, pipe);
boolean ret = nextNode.rmHelper(msg, start + 1, size - 1, pipe);
if (nextNode.isRedundant()) {
assert (count > 0);

Expand Down
15 changes: 8 additions & 7 deletions src/main/java/zmq/socket/pubsub/Trie.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.nio.ByteBuffer;

import zmq.Msg;
import zmq.pipe.Pipe;
import zmq.util.Utils;

Expand Down Expand Up @@ -32,15 +33,15 @@ public Trie()

// Add key to the trie. Returns true if this is a new item in the trie
// rather than a duplicate.
public boolean add(byte[] prefix, int start, int size)
public boolean add(Msg msg, int start, int size)
{
// We are at the node corresponding to the prefix. We are done.
if (size == 0) {
++refcnt;
return refcnt == 1;
}

byte c = prefix[start];
byte c = msg.get(start);
if (c < min || c >= min + count) {
// The character is out of range of currently handled
// characters. We have to extend the table.
Expand Down Expand Up @@ -79,7 +80,7 @@ else if (min < c) {
++liveNodes;
assert (liveNodes == 1);
}
return next[0].add(prefix, start + 1, size - 1);
return next[0].add(msg, start + 1, size - 1);
}
else {
if (next[c - min] == null) {
Expand All @@ -88,7 +89,7 @@ else if (min < c) {
++liveNodes;
assert (liveNodes > 1);
}
return next[c - min].add(prefix, start + 1, size - 1);
return next[c - min].add(msg, start + 1, size - 1);
}
}

Expand All @@ -99,7 +100,7 @@ private Trie[] realloc(Trie[] table, int size, boolean ended)

// Remove key from the trie. Returns true if the item is actually
// removed from the trie.
public boolean rm(byte[] prefix, int start, int size)
public boolean rm(Msg msg, int start, int size)
{
// TODO: Shouldn't an error be reported if the key does not exist?

Expand All @@ -111,7 +112,7 @@ public boolean rm(byte[] prefix, int start, int size)
return refcnt == 0;
}

byte c = prefix[start];
byte c = msg.get(start);
if (count == 0 || c < min || c >= min + count) {
return false;
}
Expand All @@ -122,7 +123,7 @@ public boolean rm(byte[] prefix, int start, int size)
return false;
}

boolean ret = nextNode.rm(prefix, start + 1, size - 1);
boolean ret = nextNode.rm(msg, start + 1, size - 1);

// Prune redundant nodes
if (nextNode.isRedundant()) {
Expand Down
13 changes: 6 additions & 7 deletions src/main/java/zmq/socket/pubsub/XPub.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void xattachPipe(Pipe pipe, boolean subscribeToAll)
// If subscribe_to_all_ is specified, the caller would like to subscribe
// to all data on this pipe, implicitly.
if (subscribeToAll) {
subscriptions.add(null, pipe);
subscriptions.addOnTop(pipe);
}

// The pipe is active when attached. Let's read the subscriptions from
Expand All @@ -96,20 +96,19 @@ protected void xreadActivated(Pipe pipe)
Msg sub;
while ((sub = pipe.read()) != null) {
// Apply the subscription to the trie
byte[] data = sub.data();
int size = sub.size();
if (size > 0 && (data[0] == 0 || data[0] == 1)) {
if (size > 0 && (sub.get(0) == 0 || sub.get(0) == 1)) {
boolean unique;
if (data[0] == 0) {
unique = subscriptions.rm(data, size, pipe);
if (sub.get(0) == 0) {
unique = subscriptions.rm(sub, pipe);
}
else {
unique = subscriptions.add(data, size, pipe);
unique = subscriptions.add(sub, pipe);
}

// If the subscription is not a duplicate, store it so that it can be
// passed to used on next recv call. (Unsubscribe is not verbose.)
if (options.type == ZMQ.ZMQ_XPUB && (unique || (data[0] > 0 && verbose))) {
if (options.type == ZMQ.ZMQ_XPUB && (unique || (sub.get(0) > 0 && verbose))) {
pendingData.add(Blob.createBlob(sub));
pendingFlags.add(0);
}
Expand Down
9 changes: 4 additions & 5 deletions src/main/java/zmq/socket/pubsub/XSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,19 @@ protected void xhiccuped(Pipe pipe)
protected boolean xsend(Msg msg)
{
final int size = msg.size();
final byte[] data = msg.data();

if (size > 0 && data[0] == 1) {
if (size > 0 && msg.get(0) == 1) {
// Process subscribe message
// This used to filter out duplicate subscriptions,
// however this is already done on the XPUB side and
// doing it here as well breaks ZMQ_XPUB_VERBOSE
// when there are forwarding devices involved.
subscriptions.add(data, 1, size - 1);
subscriptions.add(msg, 1, size - 1);
return dist.sendToAll(msg);
}
else if (size > 0 && data[0] == 0) {
else if (size > 0 && msg.get(0) == 0) {
// Process unsubscribe message
if (subscriptions.rm(data, 1, size - 1)) {
if (subscriptions.rm(msg, 1, size - 1)) {
return dist.sendToAll(msg);
}
}
Expand Down
Loading

0 comments on commit 225a521

Please sign in to comment.