Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observationstore improvement #87

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -199,7 +200,7 @@ public void sendRequest(final Exchange exchange, final Request request) {
if (request.getOptions().hasObserve() && request.getOptions().getObserve() == 0 && (!request.getOptions().hasBlock2()
|| request.getOptions().getBlock2().getNum() == 0 && !request.getOptions().getBlock2().isM())) {
// add request to the store
observationStore.add(new Observation(request, null));
List<Observation> observationRemoved = observationStore.add(new Observation(request, null));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we say there can be at most only one observe per resource, should not add(Observation) return one Observation instead of a List?

// remove it if the request is cancelled, rejected or timedout
request.addMessageObserver(new MessageObserverAdapter() {
@Override
Expand All @@ -215,6 +216,10 @@ public void onTimeout() {
observationStore.remove(request.getToken());
}
});
// cancel observation remove from the store
for (Observation o : observationRemoved) {
cancelPendingRequest(o.getRequest().getToken());
}
}

exchange.setObserver(exchangeObserver);
Expand Down Expand Up @@ -631,15 +636,22 @@ public void contextEstablished(Exchange exchange) {
}
}

@Override
public void cancelObserve(final byte[] token) {
// search for pending blockwise exchange for this observe request
private void cancelPendingRequest(byte[] token) {
// Cancel all request in pending exchange which are linked to this
// token.
// (this means also block-wised request relative to the original request
// with the given token)
for (Entry<KeyToken, Exchange> key : exchangesByToken.entrySet()) {
Request cachedRequest = key.getValue().getRequest();
if (cachedRequest != null && Arrays.equals(token, cachedRequest.getToken())) {
cachedRequest.cancel();
}
}
}

@Override
public void cancelObserve(final byte[] token) {
cancelPendingRequest(token);
observationStore.remove(token);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
******************************************************************************/
package org.eclipse.californium.core.observe;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -31,10 +33,11 @@ public class InMemoryObservationStore implements ObservationStore {
private Map<KeyToken, Observation> map = new ConcurrentHashMap<>();

@Override
public void add(Observation obs) {
public List<Observation> add(Observation obs) {
if (obs != null) {
map.put(new KeyToken(obs.getRequest().getToken()), obs);
}
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
******************************************************************************/
package org.eclipse.californium.core.observe;

import java.util.List;

import org.eclipse.californium.elements.CorrelationContext;

/**
Expand All @@ -28,8 +30,12 @@ public interface ObservationStore {

/**
* Adds an observation to the store.
*
* @return observations removed from the store pending this addition. (Could
* be used to ensure there is only one observation for a target
* resource)
*/
void add(Observation obs);
List<Observation> add(Observation obs);

/**
* Removes the observation initiated by the request with the given token.
Expand Down