Skip to content

Commit

Permalink
Introduce basic correlation strategy for matching responses and
Browse files Browse the repository at this point in the history
requests.

Replaces the isResponseRelatedToRequest in the Matcher with
CorrelationStrategy.match(). Intended to be extended with a followup PR
for matching messages on sending to fix issue eclipse-californium#104.

Signed-off-by: Achim Kraus <achim.kraus@bosch-si.com>
  • Loading branch information
Achim Kraus committed Jan 24, 2017
1 parent feace05 commit 5625e2f
Show file tree
Hide file tree
Showing 9 changed files with 300 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
* Bosch Software Innovations GmbH - adapt message parsing error handling
* Joe Magerramov (Amazon Web Services) - CoAP over TCP support.
* Bosch Software Innovations GmbH - adjust request scheme for TCP
* Achim Kraus (Bosch Software Innovations GmbH) - introduce CorrelationStrategy
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

Expand Down Expand Up @@ -69,6 +71,7 @@
import org.eclipse.californium.core.server.MessageDeliverer;
import org.eclipse.californium.elements.Connector;
import org.eclipse.californium.elements.CorrelationContext;
import org.eclipse.californium.elements.CorrelationStrategy;
import org.eclipse.californium.elements.MessageCallback;
import org.eclipse.californium.elements.RawData;
import org.eclipse.californium.elements.RawDataChannel;
Expand Down Expand Up @@ -142,7 +145,7 @@ public class CoapEndpoint implements Endpoint {

/** The connector over which the endpoint connects to the network */
private final Connector connector;

private final String scheme;

private final String secureScheme;
Expand Down Expand Up @@ -292,17 +295,20 @@ public CoapEndpoint(Connector connector, NetworkConfig config, ObservationStore
this.connector.setRawDataReceiver(new InboxImpl());
ObservationStore observationStore = store != null ? store : new InMemoryObservationStore();
this.exchangeStore = exchangeStore;

CorrelationStrategy strategy = CorrelationStrategyFactory.create(config);
LOGGER.log(Level.CONFIG, "{0} uses {1}",
new Object[]{getClass().getSimpleName(), strategy.getName()});

if (connector.isSchemeSupported(CoAP.COAP_TCP_URI_SCHEME) ||
connector.isSchemeSupported(CoAP.COAP_SECURE_TCP_URI_SCHEME)) {
this.matcher = new TcpMatcher(config, new NotificationDispatcher(), observationStore);
this.matcher = new TcpMatcher(config, new NotificationDispatcher(), observationStore, strategy);
this.coapstack = new CoapTcpStack(config, new OutboxImpl());
this.serializer = new TcpDataSerializer();
this.parser = new TcpDataParser();
this.scheme = CoAP.COAP_TCP_URI_SCHEME;
this.secureScheme = CoAP.COAP_SECURE_TCP_URI_SCHEME;
} else {
this.matcher = new UdpMatcher(config, new NotificationDispatcher(), observationStore);
this.matcher = new UdpMatcher(config, new NotificationDispatcher(), observationStore, strategy);
this.coapstack = new CoapUdpStack(config, new OutboxImpl());
this.serializer = new UdpDataSerializer();
this.parser = new UdpDataParser();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*******************************************************************************
* Copyright (c) 2017 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.html.
*
* Contributors:
* Bosch Software Innovations GmbH - introduce CorrelationStrategy
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

import org.eclipse.californium.core.network.config.NetworkConfig;
import org.eclipse.californium.elements.CorrelationStrategy;
import org.eclipse.californium.elements.RelaxedCorrelationStrategy;
import org.eclipse.californium.elements.StrictlyCorrelationStrategy;

/**
* Factory for correlation strategy.
*/
public abstract class CorrelationStrategyFactory {

/**
* Factory instance for correlation strategy. Default creates correlation
* strategy according the configuration. If USE_STRICT_RESPONSE_MATCHING is
* set, use {@link StrictlyCorrelationStrategy}, otherwise
* {@link RelaxedCorrelationStrategy}.
*/
private static CorrelationStrategyFactory factory = new CorrelationStrategyFactory() {

protected CorrelationStrategy createStrategy(NetworkConfig config) {
return config.getBoolean(NetworkConfig.Keys.USE_STRICT_RESPONSE_MATCHING) ? new StrictlyCorrelationStrategy()
: new RelaxedCorrelationStrategy();
}
};

/**
* Create correlation strategy based on configuration.
*
* @param config configuration
* @return correlation strategy
*/
protected abstract CorrelationStrategy createStrategy(NetworkConfig config);

/**
* Replace the strategy factory with a custom factory.
*
* @param newFactory new correlation strategy factory. If null, the current
* factory is not replaced and just the old one returned.
* @return old strategy factory
*/
public static synchronized CorrelationStrategyFactory replaceFactory(CorrelationStrategyFactory newFactory) {
CorrelationStrategyFactory oldFactory = factory;
if (null != newFactory) {
factory = newFactory;
}
return oldFactory;
}

/**
* Get the current factory.
*
* @return strategy factory
*/
private static synchronized CorrelationStrategyFactory getFactory() {
return factory;
}

/**
* Create correlation strategy according the configuration.
*
* @param config configuration.
* @return correlation strategy
*/
public static CorrelationStrategy create(NetworkConfig config) {
return getFactory().createStrategy(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
* of Response(s) to Request (fix GitHub issue #1)
* Joe Magerramov (Amazon Web Services) - CoAP over TCP support.
* Achim Kraus (Bosch Software Innovations GmbH) - processing of notifies according UdpMatcher.
* Achim Kraus (Bosch Software Innovations GmbH) - replace isResponseRelatedToRequest
* with CorrelationStrategy
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

Expand All @@ -36,6 +39,7 @@
import org.eclipse.californium.core.observe.NotificationListener;
import org.eclipse.californium.core.observe.ObservationStore;
import org.eclipse.californium.elements.CorrelationContext;
import org.eclipse.californium.elements.CorrelationStrategy;

/**
* Matcher that runs over reliable TCP/TLS protocol. Based on
Expand All @@ -45,6 +49,7 @@ public final class TcpMatcher extends BaseMatcher {

private static final Logger LOGGER = Logger.getLogger(TcpMatcher.class.getName());
private final ExchangeObserver exchangeObserver = new ExchangeObserverImpl();
private final CorrelationStrategy matchingStrategy;

/**
* Creates a new matcher for running CoAP over TCP.
Expand All @@ -54,12 +59,14 @@ public final class TcpMatcher extends BaseMatcher {
* received from peers.
* @param observationStore the object to use for keeping track of
* observations created by the endpoint this matcher is part of.
* @param matchingStrategy strategy to match the correlation context of request and response
* @throws NullPointerException if the configuration, notification listener,
* or the observation store is {@code null}.
*/
public TcpMatcher(final NetworkConfig config, final NotificationListener notificationListener,
final ObservationStore observationStore) {
final ObservationStore observationStore, final CorrelationStrategy matchingStrategy) {
super(config, notificationListener, observationStore);
this.matchingStrategy = matchingStrategy;
}

@Override
Expand Down Expand Up @@ -124,7 +131,7 @@ public Exchange receiveResponse(final Response response, final CorrelationContex
if (exchange == null) {
// There is no exchange with the given token - ignore response
return null;
} else if (isResponseRelatedToRequest(exchange, responseContext)) {
} else if (matchingStrategy.isResponseRelatedToRequest(exchange.getCorrelationContext(), responseContext)) {
return exchange;
} else {
LOGGER.log(Level.INFO,
Expand All @@ -134,10 +141,6 @@ public Exchange receiveResponse(final Response response, final CorrelationContex
}
}

private static boolean isResponseRelatedToRequest(final Exchange exchange, final CorrelationContext responseContext) {
return exchange.getCorrelationContext() == null || exchange.getCorrelationContext().equals(responseContext);
}

@Override
public Exchange receiveEmptyMessage(final EmptyMessage message) {
/* ignore received empty messages via tcp */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
* explicit String concatenation
* Bosch Software Innovations GmbH - use correlation context to improve matching
* of Response(s) to Request (fix GitHub issue #1)
* Achim Kraus (Bosch Software Innovations GmbH) - replace isResponseRelatedToRequest
* with CorrelationStrategy
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

Expand All @@ -39,7 +42,7 @@
import org.eclipse.californium.core.observe.ObservationStore;
import org.eclipse.californium.core.observe.ObserveRelation;
import org.eclipse.californium.elements.CorrelationContext;
import org.eclipse.californium.elements.DtlsCorrelationContext;
import org.eclipse.californium.elements.CorrelationStrategy;

/**
* A Matcher for CoAP messages transmitted over UDP.
Expand All @@ -50,7 +53,7 @@ public final class UdpMatcher extends BaseMatcher {

private final ExchangeObserver exchangeObserver = new ExchangeObserverImpl();
// TODO: Multicast Exchanges: should not be removed from deduplicator
private final boolean useStrictResponseMatching;
private final CorrelationStrategy matchingStrategy;

/**
* Creates a new matcher for running CoAP over UDP.
Expand All @@ -59,18 +62,14 @@ public final class UdpMatcher extends BaseMatcher {
* @param notificationListener the callback to invoke for notifications received from peers.
* @param observationStore the object to use for keeping track of observations created by the endpoint
* this matcher is part of.
* @param matchingStrategy strategy to match the correlation context of request and response
* @throws NullPointerException if the configuration, notification listener,
* or the observation store is {@code null}.
*/
public UdpMatcher(final NetworkConfig config, final NotificationListener notificationListener,
final ObservationStore observationStore) {
final ObservationStore observationStore, final CorrelationStrategy matchingStrategy) {
super(config, notificationListener, observationStore);
useStrictResponseMatching = config.getBoolean(NetworkConfig.Keys.USE_STRICT_RESPONSE_MATCHING);

LOGGER.log(Level.CONFIG, "{0} uses {1}={2}",
new Object[]{getClass().getSimpleName(),
NetworkConfig.Keys.USE_STRICT_RESPONSE_MATCHING,
useStrictResponseMatching});
this.matchingStrategy = matchingStrategy;
}

@Override
Expand Down Expand Up @@ -216,7 +215,7 @@ public Exchange receiveResponse(final Response response, final CorrelationContex
}
// ignore response
return null;
} else if (isResponseRelatedToRequest(exchange, responseContext)) {
} else if (matchingStrategy.isResponseRelatedToRequest(exchange.getCorrelationContext(), responseContext)) {

// we have received a Response matching the token of an ongoing Exchange's Request
// according to the CoAP spec (https://tools.ietf.org/html/rfc7252#section-4.5),
Expand Down Expand Up @@ -248,50 +247,6 @@ public Exchange receiveResponse(final Response response, final CorrelationContex
}
}

private boolean isResponseRelatedToRequest(final Exchange exchange, final CorrelationContext responseContext) {
if (exchange.getCorrelationContext() == null) {
// no correlation information available for request, thus any
// additional correlation information available in the response is ignored
return true;
} else if (exchange.getCorrelationContext().get(DtlsCorrelationContext.KEY_SESSION_ID) != null) {
// original request has been sent via a DTLS protected transport
// check if the response has been received in the same DTLS session
if (useStrictResponseMatching) {
return isResponseStrictlyRelatedToDtlsRequest(exchange.getCorrelationContext(), responseContext);
} else {
return isResponseRelatedToDtlsRequest(exchange.getCorrelationContext(), responseContext);
}
} else {
// compare message context used for sending original request to context
// the response has been received in
return exchange.getCorrelationContext().equals(responseContext);
}
}

private boolean isResponseRelatedToDtlsRequest(final CorrelationContext requestContext, final CorrelationContext responseContext) {
if (responseContext == null) {
return false;
} else {
return requestContext.get(DtlsCorrelationContext.KEY_SESSION_ID)
.equals(responseContext.get(DtlsCorrelationContext.KEY_SESSION_ID))
&& requestContext.get(DtlsCorrelationContext.KEY_CIPHER)
.equals(responseContext.get(DtlsCorrelationContext.KEY_CIPHER));
}
}

private boolean isResponseStrictlyRelatedToDtlsRequest(final CorrelationContext requestContext, final CorrelationContext responseContext) {
if (responseContext == null) {
return false;
} else {
return requestContext.get(DtlsCorrelationContext.KEY_SESSION_ID)
.equals(responseContext.get(DtlsCorrelationContext.KEY_SESSION_ID))
&& requestContext.get(DtlsCorrelationContext.KEY_EPOCH)
.equals(responseContext.get(DtlsCorrelationContext.KEY_EPOCH))
&& requestContext.get(DtlsCorrelationContext.KEY_CIPHER)
.equals(responseContext.get(DtlsCorrelationContext.KEY_CIPHER));
}
}

@Override
public Exchange receiveEmptyMessage(final EmptyMessage message) {

Expand Down Expand Up @@ -366,7 +321,6 @@ public void completed(final Exchange exchange) {
// this endpoint created the Exchange to respond to a request

Response response = exchange.getCurrentResponse();
Request request = exchange.getCurrentRequest();

if (response != null && response.getType() != Type.ACK) {
// this means that we have sent the response in a separate CON/NON message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*
* Contributors:
* Bosch Software Innovations GmbH - initial creation
* Achim Kraus (Bosch Software Innovations GmbH) - add CorrelationStrategy
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

Expand Down Expand Up @@ -56,8 +58,7 @@ public void onNotification(Request request, Response response) {
}

};

TcpMatcher matcher = new TcpMatcher(config, notificationListener, new InMemoryObservationStore());
TcpMatcher matcher = new TcpMatcher(config, notificationListener, new InMemoryObservationStore(), CorrelationStrategyFactory.create(config));
matcher.start();
return matcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*
* Contributors:
* Bosch Software Innovations GmbH - initial creation
* Achim Kraus (Bosch Software Innovations GmbH) - add CorrelationStrategy
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.core.network;

Expand Down Expand Up @@ -255,7 +257,7 @@ public void onNotification(Request request, Response response) {

};

UdpMatcher matcher = new UdpMatcher(config, notificationListener, observationStore);
UdpMatcher matcher = new UdpMatcher(config, notificationListener, observationStore, CorrelationStrategyFactory.create(config));

matcher.setMessageExchangeStore(messageExchangeStore);
matcher.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*******************************************************************************
* Copyright (c) 2017 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.html.
*
* Contributors:
* Bosch Software Innovations GmbH - add flexible correlation context matching
* (fix GitHub issue #104)
******************************************************************************/
package org.eclipse.californium.elements;

/**
* Interface for correlation context processing. Enable implementor to flexible
* decide on context correlation information.
*/
public interface CorrelationStrategy {

/**
* Return strategy name. Used for logging.
*
* @return name of strategy.
*/
String getName();

/**
* Check, if responses is related to the request.
*
* @param requestContext correlation context of request
* @param responseContext correlation context of response
* @return true, if response is related to the request, false, if response
* should not be considered for this request.
*/
boolean isResponseRelatedToRequest(CorrelationContext requestContext, CorrelationContext responseContext);

}
Loading

0 comments on commit 5625e2f

Please sign in to comment.