diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java b/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java index 54583b8bba..2230e76a4f 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/UdpMatcher.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; @@ -199,7 +200,7 @@ public void sendRequest(final Exchange exchange, final Request request) { if (request.getOptions().hasObserve() && request.getOptions().getObserve() == 0 && (!request.getOptions().hasBlock2() || request.getOptions().getBlock2().getNum() == 0 && !request.getOptions().getBlock2().isM())) { // add request to the store - observationStore.add(new Observation(request, null)); + List observationRemoved = observationStore.add(new Observation(request, null)); // remove it if the request is cancelled, rejected or timedout request.addMessageObserver(new MessageObserverAdapter() { @Override @@ -215,6 +216,10 @@ public void onTimeout() { observationStore.remove(request.getToken()); } }); + // cancel observation remove from the store + for (Observation o : observationRemoved) { + cancelPendingRequest(o.getRequest().getToken()); + } } exchange.setObserver(exchangeObserver); @@ -631,15 +636,22 @@ public void contextEstablished(Exchange exchange) { } } - @Override - public void cancelObserve(final byte[] token) { - // search for pending blockwise exchange for this observe request + private void cancelPendingRequest(byte[] token) { + // Cancel all request in pending exchange which are linked to this + // token. + // (this means also block-wised request relative to the original request + // with the given token) for (Entry key : exchangesByToken.entrySet()) { Request cachedRequest = key.getValue().getRequest(); if (cachedRequest != null && Arrays.equals(token, cachedRequest.getToken())) { cachedRequest.cancel(); } } + } + + @Override + public void cancelObserve(final byte[] token) { + cancelPendingRequest(token); observationStore.remove(token); } } diff --git a/californium-core/src/main/java/org/eclipse/californium/core/observe/InMemoryObservationStore.java b/californium-core/src/main/java/org/eclipse/californium/core/observe/InMemoryObservationStore.java index f6656ff801..0f7e2ebe0f 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/observe/InMemoryObservationStore.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/observe/InMemoryObservationStore.java @@ -13,6 +13,8 @@ ******************************************************************************/ package org.eclipse.californium.core.observe; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -31,10 +33,11 @@ public class InMemoryObservationStore implements ObservationStore { private Map map = new ConcurrentHashMap<>(); @Override - public void add(Observation obs) { + public List add(Observation obs) { if (obs != null) { map.put(new KeyToken(obs.getRequest().getToken()), obs); } + return Collections.emptyList(); } @Override diff --git a/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStore.java b/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStore.java index fa82502f80..3dfb144caf 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStore.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/observe/ObservationStore.java @@ -13,6 +13,8 @@ ******************************************************************************/ package org.eclipse.californium.core.observe; +import java.util.List; + import org.eclipse.californium.elements.CorrelationContext; /** @@ -28,8 +30,12 @@ public interface ObservationStore { /** * Adds an observation to the store. + * + * @return observations removed from the store pending this addition. (Could + * be used to ensure there is only one observation for a target + * resource) */ - void add(Observation obs); + List add(Observation obs); /** * Removes the observation initiated by the request with the given token.