Skip to content

Commit

Permalink
Refactor and Debug
Browse files Browse the repository at this point in the history
Fixed multiple publisher/subscriber bug
  • Loading branch information
neocoretechs committed Feb 21, 2022
1 parent c3092bc commit 024f795
Show file tree
Hide file tree
Showing 20 changed files with 111 additions and 204 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

*.class
*.jar
2 changes: 1 addition & 1 deletion src/main/java/org/ros/internal/node/DefaultNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public <T> Publisher<T> newPublisher(GraphName topicName, String messageType) {
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
Publisher<T> publisher = null;
try {
publisher = publisherFactory.newOrExisting(topicDeclaration, slaveServer.getSubscribers());
publisher = publisherFactory.newOrExisting(topicDeclaration);
} catch(IOException e) { throw new RosRuntimeException(e); }
return publisher;
}
Expand Down
8 changes: 1 addition & 7 deletions src/main/java/org/ros/internal/node/server/SlaveServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,7 @@ public SlaveServer(GraphName nodeName, BindAddress tcpRosBindAddress,
public AdvertiseAddress getTcpRosAdvertiseAddress() {
return tcpRosServer.getAdvertiseAddress();
}
/**
* Return the ChannelHandlerContext array of subscribers
* @return
*/
public ArrayBlockingQueue<ChannelHandlerContext> getSubscribers() {
return tcpRosServer.getSubscribers();
}

/**
* Start the RPC server. This start() routine requires that the
* {@link TcpRosServer} is initialized first so that the slave server returns
Expand Down
20 changes: 16 additions & 4 deletions src/main/java/org/ros/internal/node/topic/DefaultPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ
private final ArrayBlockingQueue<ChannelHandlerContext> subscribers;

public DefaultPublisher(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration,
MessageFactory messageFactory, ScheduledExecutorService executorService, ArrayBlockingQueue<ChannelHandlerContext> arrayBlockingQueue) throws IOException {
MessageFactory messageFactory, ScheduledExecutorService executorService) throws IOException {
super(topicDeclaration);
this.nodeIdentifier = nodeIdentifier;
this.messageFactory = messageFactory;
this.subscribers = arrayBlockingQueue;
outgoingMessageQueue = new OutgoingMessageQueue<T>(executorService, arrayBlockingQueue);
this.subscribers = new ArrayBlockingQueue<ChannelHandlerContext>(1024);
outgoingMessageQueue = new OutgoingMessageQueue<T>(executorService, subscribers);
if(DEBUG)
log.info("DefaultPublisher contructed with "+outgoingMessageQueue.getNumberOfChannels()+" channels.");
listeners = new ListenerGroup<PublisherListener<T>>(executorService);
Expand Down Expand Up @@ -203,7 +203,19 @@ public void addSubscriber(SubscriberIdentifier subscriberIdentifer, ChannelHandl
//outgoingMessageQueue.addChannel(ctx);
subscribers.add(ctx);
if (DEBUG) {
log.info("Current number of subscribers:"+subscribers.size());
StringBuilder sb = new StringBuilder();
sb.append(subscribers.size());
sb.append("Subscribers for publisher ");
sb.append(toDeclaration());
sb.append(":\r\n");
Object[] sa = subscribers.toArray();
for(int i = 0; i < subscribers.size(); i++) {
sb.append(i);
sb.append(") ");
sb.append(sa[i]);
sb.append("\r\n");
}
log.info(sb.toString());
}
signalOnNewSubscriber(subscriberIdentifer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,7 @@ public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddr
log.info("Defaultsubscriber addPublisher topicParticipantManager DOES NOT CONTAIN "+publisherIdentifier+" at "+address);
topicParticipantManager.addSubscriberConnection(this, publisherIdentifier);
}
tcpClientManager.connect(toString(), address);
// TODO(damonkohler): knownPublishers is duplicate information that is
// already available to the TopicParticipantManager.
//knownPublishers.add(publisherIdentifier);
tcpClientManager.connect(toString(), address);
signalOnNewPublisher(publisherIdentifier);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,19 @@ public PublisherFactory(NodeIdentifier nodeIdentifier,
*
* @param <T> the message type associated with the {@link Publisher}
* @param topicDeclaration {@link TopicDeclaration} that is being published
* @param arrayBlockingQueue
* @param messageSerializer the {@link MessageSerializer} used for published messages
* @return a new or cached {@link Publisher} instance
* @throws IOException
*/
@SuppressWarnings("unchecked")
public <T> Publisher<T> newOrExisting(TopicDeclaration topicDeclaration, ArrayBlockingQueue<ChannelHandlerContext> arrayBlockingQueue) throws IOException {
public <T> Publisher<T> newOrExisting(TopicDeclaration topicDeclaration) throws IOException {
GraphName topicName = topicDeclaration.getName();
synchronized (mutex) {
if (topicParticipantManager.hasPublisher(topicName)) {
return (DefaultPublisher<T>) topicParticipantManager.getPublisher(topicName);
} else {
DefaultPublisher<T> publisher =
new DefaultPublisher<T>(nodeIdentifier, topicDeclaration, messageFactory, executorService, arrayBlockingQueue);
new DefaultPublisher<T>(nodeIdentifier, topicDeclaration, messageFactory, executorService);
publisher.addListener(new DefaultPublisherListener<T>() {
@Override
public void onNewSubscriber(Publisher<T> publisher, SubscriberIdentifier subscriberIdentifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.Set;
import java.util.concurrent.Executor;

import org.ros.internal.transport.tcp.ChannelGroup;

/**
* This is the ChannelHandlerContext that links the underlying TCP Socket 'channel' to the {@link ChannelPipeline} and the {@link ChannelGroup}
Expand Down Expand Up @@ -96,12 +95,6 @@ public interface ChannelHandlerContext {
*/
ChannelPipeline pipeline();

/**
* Return the channel group
*
*/
/*Asynchronous*/ChannelGroup getChannelGroup();

/**
* Determine if this channel is ready for processing, it is configured, has a socket
* and the communication is sound. If the socket breaks this goes false and no writes are
Expand All @@ -114,12 +107,6 @@ public interface ChannelHandlerContext {
* @param ready
*/
void setReady(boolean ready);

/**
* Get the Object representing a mutex to use for completion of operation if necessary.
* @return
*/
Object getChannelCompletionMutex();

/**
* Each successive handshake completion will add another message type to this synchronized set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.ros.internal.transport.tcp.ChannelGroup;


/**
* A handler context contains all the executor, the channel group, the channel, and the pipeline with the handlers.
Expand All @@ -30,27 +28,24 @@
* The functions of the system move data through the pipeline, triggering the handlers in the sequence they were
* added.
* Traffic is filtered to subscriber channels via the hash set of requested message types
* @author jg
* @author Jonathan Groff Copyright (C) NeoCoreTechs 2017,2022
*
*/
public class ChannelHandlerContextImpl implements ChannelHandlerContext {
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(ChannelHandlerContextImpl.class);
/*Asynchronous*/ChannelGroup channelGroup;
/*Asynchronous*/Socket/*Channel*/ channel;
private Executor executor;
ChannelPipeline pipeline;
boolean ready = false;
private Object mutex = new Object();
Set<String> outboundMessageTypes;
InputStream is = null;
OutputStream os = null;
ObjectInputStream ois = null;

public ChannelHandlerContextImpl(/*Asynchronous*/ChannelGroup channelGroup2, /*Asynchronous*/Socket channel2) {
channelGroup = channelGroup2;
channel = channel2;
pipeline = new ChannelPipelineImpl(this);
outboundMessageTypes = (Set<String>) new HashSet<String>();

public ChannelHandlerContextImpl(Executor executor, /*Asynchronous*/Socket channel) {
this.executor = executor;
this.channel = channel;
this.pipeline = new ChannelPipelineImpl(this);
this.outboundMessageTypes = (Set<String>) new HashSet<String>();
}

public void setChannel(/*Asynchronous*/Socket/*Channel*/ sock) {
Expand All @@ -59,7 +54,7 @@ public void setChannel(/*Asynchronous*/Socket/*Channel*/ sock) {

@Override
public Executor executor() {
return channelGroup.getExecutorService();
return executor;
}

@Override
Expand Down Expand Up @@ -115,52 +110,51 @@ public void close() throws IOException {

@Override
public Object read() throws IOException {
is = channel.getInputStream();
//ObjectInputStream ois = new ObjectInputStream(is);
ois = new ObjectInputStream(is);
try {
return ois.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(e);
} catch(StreamCorruptedException sce) {
is = channel.getInputStream();
ois = new ObjectInputStream(is);
InputStream is = channel.getInputStream();
ObjectInputStream ois = new ObjectInputStream(is);
try {
return ois.readObject();
} catch (ClassNotFoundException cnf) {
throw new IOException(cnf);
} catch (ClassNotFoundException e) {
throw new IOException(e);
} catch(StreamCorruptedException sce) {
is = channel.getInputStream();
ois = new ObjectInputStream(is);
try {
return ois.readObject();
} catch (ClassNotFoundException cnf) {
throw new IOException(cnf);
}
}
}
}

@Override
public void write(Object msg) throws IOException {
os = channel.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(msg);
oos.flush();
OutputStream os = channel.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(msg);
oos.flush();
}

@Override
public void write(Object msg, CompletionHandler<Integer, Void> handler) {
try {
write(msg);
handler.completed(0, null);
} catch (IOException e) {
handler.failed(e, null);
}
try {
write(msg);
handler.completed(0, null);
} catch (IOException e) {
handler.failed(e, null);
}
}

@Override
public Object read(CompletionHandler<Integer, Void> handler) {
try {
Object o = read();
handler.completed(0, null);
return o;
} catch (IOException e) {
handler.failed(e, null);
}
return null;
try {
Object o = read();
handler.completed(0, null);
return o;
} catch (IOException e) {
handler.failed(e, null);
}
return null;
}


Expand All @@ -169,19 +163,14 @@ public ChannelPipeline pipeline() {
return pipeline;
}

@Override
public /*Asynchronous*/ChannelGroup getChannelGroup() {
return channelGroup;
}

@Override
public Socket channel() {
return channel;
}

@Override
public String toString() {
return new String("ChannelHandlerContext:"+channel+" ChannelGroup:"+channelGroup+" ChannelPipeline:"+pipeline+" ready:"+ready);
return new String("ChannelHandlerContext:"+channel+" ChannelPipeline:"+pipeline+" ready:"+ready);
}

@Override
Expand All @@ -194,13 +183,6 @@ public boolean isReady() {
*/
@Override
public void setReady(boolean ready) { this.ready = ready;}

/**
* Object to synchronize read and write completion for the channel in this context, since we will have
* multiple outbound writers accessing the same channel
*/
@Override
public Object getChannelCompletionMutex() { return mutex; }

/**
* Get the type of messages we want to send to the attached subscriber, based on the handshakes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.WritePendingException;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

import org.apache.commons.logging.Log;
Expand All @@ -30,7 +27,7 @@
*/
public class OutgoingMessageQueue<T> {

private static final boolean DEBUG = false;
private static final boolean DEBUG = true;
private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class);

private static final int DEQUE_CAPACITY = 16;
Expand Down Expand Up @@ -119,14 +116,14 @@ public void failed(Throwable arg0, Void arg1) {
} // loop method
}

public OutgoingMessageQueue(ExecutorService executorService, ArrayBlockingQueue<ChannelHandlerContext> arrayBlockingQueue) throws IOException {
public OutgoingMessageQueue(ExecutorService executorService, ArrayBlockingQueue<ChannelHandlerContext> subscriberQueue) throws IOException {
deque = new CircularBlockingDeque<T>(DEQUE_CAPACITY);
writer = new Writer();
//messageBufferPool = new MessageBufferPool();
latchedBuffer = MessageBuffers.dynamicBuffer();
mutex = new Object();
latchMode = false;
channels = arrayBlockingQueue;
channels = subscriberQueue;
executorService.execute(writer);
}

Expand Down
Loading

0 comments on commit 024f795

Please sign in to comment.