Skip to content

Commit

Permalink
Introduce basic correlation context matcher to relate responses and
Browse files Browse the repository at this point in the history
requests.

Replaces the isResponseRelatedToRequest in the Matcher with
CorrelationContextMatcher.isResponseRelatedToRequest(). Intended to be
extended with a followup PR
for matching messages on sending to fix issue eclipse-californium#104.

Signed-off-by: Achim Kraus <achim.kraus@bosch-si.com>
  • Loading branch information
Achim Kraus committed Jan 25, 2017
1 parent 8b42bba commit 51796e8
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
* Bosch Software Innovations GmbH - adapt message parsing error handling
* Joe Magerramov (Amazon Web Services) - CoAP over TCP support.
* Bosch Software Innovations GmbH - adjust request scheme for TCP
* Achim Kraus (Bosch Software Innovations GmbH) - introduce CorrelationContextMatcher
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

Expand Down Expand Up @@ -69,6 +71,7 @@
import org.eclipse.californium.core.server.MessageDeliverer;
import org.eclipse.californium.elements.Connector;
import org.eclipse.californium.elements.CorrelationContext;
import org.eclipse.californium.elements.CorrelationContextMatcher;
import org.eclipse.californium.elements.MessageCallback;
import org.eclipse.californium.elements.RawData;
import org.eclipse.californium.elements.RawDataChannel;
Expand Down Expand Up @@ -235,7 +238,7 @@ public CoapEndpoint(final int port, final NetworkConfig config) {
* @param config The configuration values to use.
*/
public CoapEndpoint(final InetSocketAddress address, final NetworkConfig config) {
this(createUDPConnector(address, config), config, null, null);
this(createUDPConnector(address, config), config, null, null, null);
}

/**
Expand All @@ -246,7 +249,7 @@ public CoapEndpoint(final InetSocketAddress address, final NetworkConfig config)
* @param exchangeStore The store to use for keeping track of message exchanges.
*/
public CoapEndpoint(final InetSocketAddress address, final NetworkConfig config, final MessageExchangeStore exchangeStore) {
this(createUDPConnector(address, config), config, null, exchangeStore);
this(createUDPConnector(address, config), config, null, exchangeStore, null);
}

/**
Expand All @@ -259,7 +262,7 @@ public CoapEndpoint(final InetSocketAddress address, final NetworkConfig config,
* @param config The configuration values to use.
*/
public CoapEndpoint(final Connector connector, final NetworkConfig config) {
this(connector, config, null, null);
this(connector, config, null, null, null);
}

/**
Expand All @@ -271,7 +274,7 @@ public CoapEndpoint(final Connector connector, final NetworkConfig config) {
* endpoint.
*/
public CoapEndpoint(final InetSocketAddress address, final NetworkConfig config, final ObservationStore store) {
this(createUDPConnector(address, config), config, store, null);
this(createUDPConnector(address, config), config, store, null, null);
}

/**
Expand All @@ -287,22 +290,50 @@ public CoapEndpoint(final InetSocketAddress address, final NetworkConfig config,
* @param exchangeStore The store to use for keeping track of message exchanges.
*/
public CoapEndpoint(Connector connector, NetworkConfig config, ObservationStore store, MessageExchangeStore exchangeStore) {
this(connector, config, store, exchangeStore, null);
}

/**
* Creates a new endpoint for a connector, configuration, message exchange and observation store.
* <p>
* The endpoint will support the connector's implemented scheme and will bind to
* the IP address and port the connector is configured for.
*
* @param connector The connector to use.
* @param config The configuration values to use.
* @param store The store to use for keeping track of observations initiated by this
* endpoint.
* @param exchangeStore The store to use for keeping track of message exchanges.
* @param correlationContextMatcher correlation context matcher for relating
* responses to requests. If <code>null</code>, the result of
* {@link CorrelationContextMatcherFactory#create(NetworkConfig)}
* is used as matcher.
*/
public CoapEndpoint(Connector connector, NetworkConfig config, ObservationStore store,
MessageExchangeStore exchangeStore, CorrelationContextMatcher correlationContextMatcher) {
this.config = config;
this.connector = connector;
this.connector.setRawDataReceiver(new InboxImpl());
ObservationStore observationStore = store != null ? store : new InMemoryObservationStore();
this.exchangeStore = exchangeStore;
if (null == correlationContextMatcher) {
correlationContextMatcher = CorrelationContextMatcherFactory.create(config);
}
LOGGER.log(Level.CONFIG, "{0} uses {1}",
new Object[] { getClass().getSimpleName(), correlationContextMatcher.getName() });

if (connector.isSchemeSupported(CoAP.COAP_TCP_URI_SCHEME) ||
connector.isSchemeSupported(CoAP.COAP_SECURE_TCP_URI_SCHEME)) {
this.matcher = new TcpMatcher(config, new NotificationDispatcher(), observationStore);
if (connector.isSchemeSupported(CoAP.COAP_TCP_URI_SCHEME)
|| connector.isSchemeSupported(CoAP.COAP_SECURE_TCP_URI_SCHEME)) {
this.matcher = new TcpMatcher(config, new NotificationDispatcher(), observationStore,
correlationContextMatcher);
this.coapstack = new CoapTcpStack(config, new OutboxImpl());
this.serializer = new TcpDataSerializer();
this.parser = new TcpDataParser();
this.scheme = CoAP.COAP_TCP_URI_SCHEME;
this.secureScheme = CoAP.COAP_SECURE_TCP_URI_SCHEME;
} else {
this.matcher = new UdpMatcher(config, new NotificationDispatcher(), observationStore);
this.matcher = new UdpMatcher(config, new NotificationDispatcher(), observationStore,
correlationContextMatcher);
this.coapstack = new CoapUdpStack(config, new OutboxImpl());
this.serializer = new UdpDataSerializer();
this.parser = new UdpDataParser();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*******************************************************************************
* Copyright (c) 2017 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.html.
*
* Contributors:
* Bosch Software Innovations GmbH - introduce CorrelationContextMatcher
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

import org.eclipse.californium.core.network.config.NetworkConfig;
import org.eclipse.californium.elements.CorrelationContextMatcher;
import org.eclipse.californium.elements.RelaxedCorrelationContextMatcher;
import org.eclipse.californium.elements.StrictCorrelationContextMatcher;

/**
* Factory for correlation context matcher.
*/
public class CorrelationContextMatcherFactory {

/**
* Create correlation context matcher according the configuration. If
* USE_STRICT_RESPONSE_MATCHING is set, use
* {@link StrictCorrelationContextMatcher}, otherwise
* {@link RelaxedCorrelationContextMatcher}.
*
* @param config configuration.
* @return correlation context matcher
*/
public static CorrelationContextMatcher create(NetworkConfig config) {
return config.getBoolean(NetworkConfig.Keys.USE_STRICT_RESPONSE_MATCHING) ? new StrictCorrelationContextMatcher()
: new RelaxedCorrelationContextMatcher();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
* of Response(s) to Request (fix GitHub issue #1)
* Joe Magerramov (Amazon Web Services) - CoAP over TCP support.
* Achim Kraus (Bosch Software Innovations GmbH) - processing of notifies according UdpMatcher.
* Achim Kraus (Bosch Software Innovations GmbH) - replace isResponseRelatedToRequest
* with CorrelationContextMatcher
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

Expand All @@ -36,6 +39,7 @@
import org.eclipse.californium.core.observe.NotificationListener;
import org.eclipse.californium.core.observe.ObservationStore;
import org.eclipse.californium.elements.CorrelationContext;
import org.eclipse.californium.elements.CorrelationContextMatcher;

/**
* Matcher that runs over reliable TCP/TLS protocol. Based on
Expand All @@ -45,6 +49,7 @@ public final class TcpMatcher extends BaseMatcher {

private static final Logger LOGGER = Logger.getLogger(TcpMatcher.class.getName());
private final ExchangeObserver exchangeObserver = new ExchangeObserverImpl();
private final CorrelationContextMatcher correlationContextMatcher;

/**
* Creates a new matcher for running CoAP over TCP.
Expand All @@ -54,12 +59,15 @@ public final class TcpMatcher extends BaseMatcher {
* received from peers.
* @param observationStore the object to use for keeping track of
* observations created by the endpoint this matcher is part of.
* @param correlationContextMatcher correlation context matcher to relate
* responses with requests
* @throws NullPointerException if the configuration, notification listener,
* or the observation store is {@code null}.
*/
public TcpMatcher(final NetworkConfig config, final NotificationListener notificationListener,
final ObservationStore observationStore) {
final ObservationStore observationStore, final CorrelationContextMatcher correlationContextMatcher) {
super(config, notificationListener, observationStore);
this.correlationContextMatcher = correlationContextMatcher;
}

@Override
Expand Down Expand Up @@ -124,7 +132,7 @@ public Exchange receiveResponse(final Response response, final CorrelationContex
if (exchange == null) {
// There is no exchange with the given token - ignore response
return null;
} else if (isResponseRelatedToRequest(exchange, responseContext)) {
} else if (correlationContextMatcher.isResponseRelatedToRequest(exchange.getCorrelationContext(), responseContext)) {
return exchange;
} else {
LOGGER.log(Level.INFO,
Expand All @@ -134,10 +142,6 @@ public Exchange receiveResponse(final Response response, final CorrelationContex
}
}

private static boolean isResponseRelatedToRequest(final Exchange exchange, final CorrelationContext responseContext) {
return exchange.getCorrelationContext() == null || exchange.getCorrelationContext().equals(responseContext);
}

@Override
public Exchange receiveEmptyMessage(final EmptyMessage message) {
/* ignore received empty messages via tcp */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
* explicit String concatenation
* Bosch Software Innovations GmbH - use correlation context to improve matching
* of Response(s) to Request (fix GitHub issue #1)
* Achim Kraus (Bosch Software Innovations GmbH) - replace isResponseRelatedToRequest
* with CorrelationContextMatcher
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

Expand All @@ -39,7 +42,7 @@
import org.eclipse.californium.core.observe.ObservationStore;
import org.eclipse.californium.core.observe.ObserveRelation;
import org.eclipse.californium.elements.CorrelationContext;
import org.eclipse.californium.elements.DtlsCorrelationContext;
import org.eclipse.californium.elements.CorrelationContextMatcher;

/**
* A Matcher for CoAP messages transmitted over UDP.
Expand All @@ -50,27 +53,25 @@ public final class UdpMatcher extends BaseMatcher {

private final ExchangeObserver exchangeObserver = new ExchangeObserverImpl();
// TODO: Multicast Exchanges: should not be removed from deduplicator
private final boolean useStrictResponseMatching;
private final CorrelationContextMatcher correlationContextMatcher;

/**
* Creates a new matcher for running CoAP over UDP.
*
* @param config the configuration to use.
* @param notificationListener the callback to invoke for notifications received from peers.
* @param observationStore the object to use for keeping track of observations created by the endpoint
* this matcher is part of.
* @param notificationListener the callback to invoke for notifications
* received from peers.
* @param observationStore the object to use for keeping track of
* observations created by the endpoint this matcher is part of.
* @param correlationContextMatcher correlation context matcher to relate
* responses with requests
* @throws NullPointerException if the configuration, notification listener,
* or the observation store is {@code null}.
*/
public UdpMatcher(final NetworkConfig config, final NotificationListener notificationListener,
final ObservationStore observationStore) {
final ObservationStore observationStore, final CorrelationContextMatcher matchingStrategy) {
super(config, notificationListener, observationStore);
useStrictResponseMatching = config.getBoolean(NetworkConfig.Keys.USE_STRICT_RESPONSE_MATCHING);

LOGGER.log(Level.CONFIG, "{0} uses {1}={2}",
new Object[]{getClass().getSimpleName(),
NetworkConfig.Keys.USE_STRICT_RESPONSE_MATCHING,
useStrictResponseMatching});
this.correlationContextMatcher = matchingStrategy;
}

@Override
Expand Down Expand Up @@ -216,7 +217,7 @@ public Exchange receiveResponse(final Response response, final CorrelationContex
}
// ignore response
return null;
} else if (isResponseRelatedToRequest(exchange, responseContext)) {
} else if (correlationContextMatcher.isResponseRelatedToRequest(exchange.getCorrelationContext(), responseContext)) {

// we have received a Response matching the token of an ongoing Exchange's Request
// according to the CoAP spec (https://tools.ietf.org/html/rfc7252#section-4.5),
Expand Down Expand Up @@ -248,50 +249,6 @@ public Exchange receiveResponse(final Response response, final CorrelationContex
}
}

private boolean isResponseRelatedToRequest(final Exchange exchange, final CorrelationContext responseContext) {
if (exchange.getCorrelationContext() == null) {
// no correlation information available for request, thus any
// additional correlation information available in the response is ignored
return true;
} else if (exchange.getCorrelationContext().get(DtlsCorrelationContext.KEY_SESSION_ID) != null) {
// original request has been sent via a DTLS protected transport
// check if the response has been received in the same DTLS session
if (useStrictResponseMatching) {
return isResponseStrictlyRelatedToDtlsRequest(exchange.getCorrelationContext(), responseContext);
} else {
return isResponseRelatedToDtlsRequest(exchange.getCorrelationContext(), responseContext);
}
} else {
// compare message context used for sending original request to context
// the response has been received in
return exchange.getCorrelationContext().equals(responseContext);
}
}

private boolean isResponseRelatedToDtlsRequest(final CorrelationContext requestContext, final CorrelationContext responseContext) {
if (responseContext == null) {
return false;
} else {
return requestContext.get(DtlsCorrelationContext.KEY_SESSION_ID)
.equals(responseContext.get(DtlsCorrelationContext.KEY_SESSION_ID))
&& requestContext.get(DtlsCorrelationContext.KEY_CIPHER)
.equals(responseContext.get(DtlsCorrelationContext.KEY_CIPHER));
}
}

private boolean isResponseStrictlyRelatedToDtlsRequest(final CorrelationContext requestContext, final CorrelationContext responseContext) {
if (responseContext == null) {
return false;
} else {
return requestContext.get(DtlsCorrelationContext.KEY_SESSION_ID)
.equals(responseContext.get(DtlsCorrelationContext.KEY_SESSION_ID))
&& requestContext.get(DtlsCorrelationContext.KEY_EPOCH)
.equals(responseContext.get(DtlsCorrelationContext.KEY_EPOCH))
&& requestContext.get(DtlsCorrelationContext.KEY_CIPHER)
.equals(responseContext.get(DtlsCorrelationContext.KEY_CIPHER));
}
}

@Override
public Exchange receiveEmptyMessage(final EmptyMessage message) {

Expand Down Expand Up @@ -366,7 +323,6 @@ public void completed(final Exchange exchange) {
// this endpoint created the Exchange to respond to a request

Response response = exchange.getCurrentResponse();
Request request = exchange.getCurrentRequest();

if (response != null && response.getType() != Type.ACK) {
// this means that we have sent the response in a separate CON/NON message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*
* Contributors:
* Bosch Software Innovations GmbH - initial creation
* Achim Kraus (Bosch Software Innovations GmbH) - add CorrelationContextMatcher
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

Expand Down Expand Up @@ -56,8 +58,7 @@ public void onNotification(Request request, Response response) {
}

};

TcpMatcher matcher = new TcpMatcher(config, notificationListener, new InMemoryObservationStore());
TcpMatcher matcher = new TcpMatcher(config, notificationListener, new InMemoryObservationStore(), CorrelationContextMatcherFactory.create(config));
matcher.start();
return matcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*
* Contributors:
* Bosch Software Innovations GmbH - initial creation
* Achim Kraus (Bosch Software Innovations GmbH) - add CorrelationContextMatcher
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

Expand Down Expand Up @@ -255,7 +257,7 @@ public void onNotification(Request request, Response response) {

};

UdpMatcher matcher = new UdpMatcher(config, notificationListener, observationStore);
UdpMatcher matcher = new UdpMatcher(config, notificationListener, observationStore, CorrelationContextMatcherFactory.create(config));

matcher.setMessageExchangeStore(messageExchangeStore);
matcher.start();
Expand Down
Loading

0 comments on commit 51796e8

Please sign in to comment.