Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix tcp #120

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.californium.core.coap.EmptyMessage;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.coap.CoAP.Type;
import org.eclipse.californium.core.network.Exchange;
import org.eclipse.californium.core.network.Exchange.Origin;
import org.eclipse.californium.core.network.Outbox;
Expand Down Expand Up @@ -106,13 +107,19 @@ public CoapTcpStack(NetworkConfig config, Outbox outbox) {

// delegate to top
@Override public void sendRequest(Request request) {
if (null == request.getType()) {
request.setType(Type.CON);
}
top.sendRequest(request);
// CoAP over TCP does not have acknowledgements. Everything is automatically acknowledged.
request.setAcknowledged(true);
}

// delegate to top
@Override public void sendResponse(Exchange exchange, Response response) {
if (null == response.getType()) {
response.setType(Type.CON);
}
top.sendResponse(exchange, response);
// CoAP over TCP does not have acknowledgements. Everything is automatically acknowledged.
response.setAcknowledged(true);
Expand Down Expand Up @@ -143,17 +150,19 @@ public CoapTcpStack(NetworkConfig config, Outbox outbox) {
}

@Override public void setExecutor(ScheduledExecutorService executor) {
for (Layer layer : layers)
for (Layer layer : layers) {
layer.setExecutor(executor);
}
}

@Override public void setDeliverer(MessageDeliverer deliverer) {
this.deliverer = deliverer;
}

@Override public void destroy() {
for (Layer layer : layers)
for (Layer layer : layers) {
layer.destroy();
}
}

private class StackTopAdapter extends AbstractLayer {
Expand All @@ -175,8 +184,9 @@ public void sendRequest(Request request) {

@Override public void receiveRequest(Exchange exchange, Request request) {
// if there is no BlockwiseLayer we still have to set it
if (exchange.getRequest() == null)
if (exchange.getRequest() == null) {
exchange.setRequest(request);
}
if (hasDeliverer()) {
deliverer.deliverRequest(exchange);
} else {
Expand All @@ -185,8 +195,9 @@ public void sendRequest(Request request) {
}

@Override public void receiveResponse(Exchange exchange, Response response) {
if (!response.getOptions().hasObserve())
if (!response.getOptions().hasObserve()) {
exchange.setComplete();
}
if (hasDeliverer()) {
deliverer.deliverResponse(exchange, response); // notify request that response has arrived
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static void main(String[] args) {
.setInt(NetworkConfig.Keys.PROTOCOL_STAGE_THREAD_COUNT, 2)
.setLong(NetworkConfig.Keys.EXCHANGE_LIFETIME, 10000);

Connector serverConnector = new TcpServerConnector(new InetSocketAddress(CoAP.DEFAULT_COAP_PORT), 100, 1);
Connector serverConnector = new TcpServerConnector(new InetSocketAddress(CoAP.DEFAULT_COAP_PORT), 1, 100);
CoapEndpoint endpoint = new CoapEndpoint(serverConnector, net);

CoapServer server = new CoapServer(net);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public TcpClientConnector(int numberOfThreads, int connectTimeoutMillis, int idl
this.listenUri = URI.create(String.format("%s://127.0.0.1:0", getSupportedScheme()));
}

@Override public void start() throws IOException {
@Override public synchronized void start() throws IOException {
if (rawDataChannel == null) {
throw new IllegalStateException("Cannot start without message handler.");
}
Expand All @@ -86,9 +86,11 @@ public TcpClientConnector(int numberOfThreads, int connectTimeoutMillis, int idl
};
}

@Override public void stop() {
workerGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly();
workerGroup = null;
@Override public synchronized void stop() {
if (null != workerGroup) {
workerGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly();
workerGroup = null;
}
}

@Override public void destroy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public class TcpServerConnector implements Connector {
private EventLoopGroup workerGroup;
private URI listenUri;

public TcpServerConnector(InetSocketAddress localAddress, int idleTimeout, int numberOfThreads) {
public TcpServerConnector(InetSocketAddress localAddress, int numberOfThreads, int idleTimeout) {
this.numberOfThreads = numberOfThreads;
this.connectionIdleTimeoutSeconds = idleTimeout;
this.localAddress = localAddress;
this.listenUri = getListenUri(localAddress);
}

@Override public void start() throws IOException {
@Override public synchronized void start() throws IOException {
if (rawDataChannel == null) {
throw new IllegalStateException("Cannot start without message handler.");
}
Expand Down Expand Up @@ -92,12 +92,15 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

@Override public void stop() {
bossGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly();
workerGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly();

workerGroup = null;
bossGroup = null;
@Override public synchronized void stop() {
if (null != bossGroup) {
bossGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly();
bossGroup = null;
}
if (null != workerGroup) {
workerGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly();
workerGroup = null;
}
}

@Override public void destroy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
@RunWith(Parameterized.class)
public class TcpConnectorTest {

private static final int NUMBER_OF_THREADS = 1;
private static final int IDLE_TIMEOUT = 100;

@Rule public final Timeout timeout = new Timeout(20, TimeUnit.SECONDS);

private final int messageSize;
Expand Down Expand Up @@ -62,8 +65,8 @@ public void cleanup() {
@Test
public void serverClientPingPong() throws Exception {
int port = findEphemeralPort();
TcpServerConnector server = new TcpServerConnector(new InetSocketAddress(port), 100, 1);
TcpClientConnector client = new TcpClientConnector(1, 100, 100);
TcpServerConnector server = new TcpServerConnector(new InetSocketAddress(port), NUMBER_OF_THREADS, IDLE_TIMEOUT);
TcpClientConnector client = new TcpClientConnector(NUMBER_OF_THREADS, 100, IDLE_TIMEOUT);

cleanup.add(server);
cleanup.add(client);
Expand Down Expand Up @@ -92,7 +95,7 @@ public void serverClientPingPong() throws Exception {
public void singleServerManyClients() throws Exception {
int port = findEphemeralPort();
int clients = 100;
TcpServerConnector server = new TcpServerConnector(new InetSocketAddress(port), 100, 1);
TcpServerConnector server = new TcpServerConnector(new InetSocketAddress(port), NUMBER_OF_THREADS, IDLE_TIMEOUT);
assertThat(server.getUri().getScheme(), is("coap+tcp"));
cleanup.add(server);

Expand All @@ -102,7 +105,7 @@ public void singleServerManyClients() throws Exception {

List<RawData> messages = new ArrayList<>();
for (int i = 0; i < clients; i++) {
TcpClientConnector client = new TcpClientConnector(1, 100, 100);
TcpClientConnector client = new TcpClientConnector(NUMBER_OF_THREADS, 100, IDLE_TIMEOUT);
cleanup.add(client);
Catcher clientCatcher = new Catcher();
client.setRawDataReceiver(clientCatcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

public class TlsConnectorTest {

private static final int NUMBER_OF_THREADS = 1;
private static final int IDLE_TIMEOUT = 100;
private static SSLContext serverContext;
private static SSLContext clientContext;
private final Random random = new Random(0);
Expand Down Expand Up @@ -73,8 +75,8 @@ public void cleanup() {
@Test
public void pingPongMessage() throws Exception {
int port = findEphemeralPort();
TlsServerConnector server = new TlsServerConnector(serverContext, new InetSocketAddress(port), 100, 1);
TlsClientConnector client = new TlsClientConnector(clientContext, 1, 100, 10);
TlsServerConnector server = new TlsServerConnector(serverContext, new InetSocketAddress(port), NUMBER_OF_THREADS, IDLE_TIMEOUT);
TlsClientConnector client = new TlsClientConnector(clientContext, NUMBER_OF_THREADS, 100, 10);

Catcher serverCatcher = new Catcher();
Catcher clientCatcher = new Catcher();
Expand Down Expand Up @@ -102,7 +104,7 @@ public void pingPongMessage() throws Exception {
public void singleServerManyClients() throws Exception {
int port = findEphemeralPort();
int clients = 100;
TlsServerConnector server = new TlsServerConnector(serverContext, new InetSocketAddress(port), 100, 1);
TlsServerConnector server = new TlsServerConnector(serverContext, new InetSocketAddress(port), NUMBER_OF_THREADS, IDLE_TIMEOUT);
assertThat(server.getUri().getScheme(), is("coaps+tcp"));
cleanup.add(server);

Expand All @@ -112,7 +114,7 @@ public void singleServerManyClients() throws Exception {

List<RawData> messages = new ArrayList<>();
for (int i = 0; i < clients; i++) {
TlsClientConnector client = new TlsClientConnector(clientContext, 1, 100, 100);
TlsClientConnector client = new TlsClientConnector(clientContext, NUMBER_OF_THREADS, 100, IDLE_TIMEOUT);
cleanup.add(client);
Catcher clientCatcher = new Catcher();
client.setRawDataReceiver(clientCatcher);
Expand Down Expand Up @@ -145,7 +147,7 @@ public void singleClientManyServers() throws Exception {
Map<InetSocketAddress, Catcher> servers = new IdentityHashMap<>();
for (int i = 0; i < serverCount; i++) {
int port = findEphemeralPort();
TlsServerConnector server = new TlsServerConnector(serverContext, new InetSocketAddress(port), 100, 1);
TlsServerConnector server = new TlsServerConnector(serverContext, new InetSocketAddress(port), NUMBER_OF_THREADS, IDLE_TIMEOUT);
cleanup.add(server);
Catcher serverCatcher = new Catcher();
server.setRawDataReceiver(serverCatcher);
Expand All @@ -154,7 +156,7 @@ public void singleClientManyServers() throws Exception {
servers.put(server.getAddress(), serverCatcher);
}

TlsClientConnector client = new TlsClientConnector(clientContext, 1, 100, 100);
TlsClientConnector client = new TlsClientConnector(clientContext, NUMBER_OF_THREADS, 100, IDLE_TIMEOUT);
cleanup.add(client);
Catcher clientCatcher = new Catcher();
client.setRawDataReceiver(clientCatcher);
Expand Down