From e581476addce704300160f3af99f51123cac4693 Mon Sep 17 00:00:00 2001 From: Simon Bernard Date: Mon, 9 Oct 2017 16:13:43 +0200 Subject: [PATCH 1/4] Do not remove registration by endpoint, if no registration by Id. --- .../cluster/RedisRegistrationStore.java | 53 ++++++++++--------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java index 3746a6a3c2..4a8f2943e3 100644 --- a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java +++ b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java @@ -292,37 +292,39 @@ public void remove() { @Override public Deregistration removeRegistration(String registrationId) { try (Jedis j = pool.getResource()) { + return removeRegistration(j, registrationId, false); + } + } - byte[] regKey = toRegIdKey(registrationId); - - // fetch the client ep by registration ID index - byte[] ep = j.get(regKey); - if (ep == null) { - return null; - } - - byte[] data = j.get(toEndpointKey(ep)); - if (data == null) { - return null; - } + private Deregistration removeRegistration(Jedis j, String registrationId, boolean removeOnlyIfNotAlive) { + // fetch the client ep by registration ID index + byte[] ep = j.get(toRegIdKey(registrationId)); + if (ep == null) { + return null; + } - Registration r = deserializeReg(data); - deleteRegistration(j, r); - Collection obsRemoved = unsafeRemoveAllObservations(j, r.getId()); - return new Deregistration(r, obsRemoved); + // fetch the client + byte[] data = j.get(toEndpointKey(ep)); + if (data == null) { + return null; } - } - private void deleteRegistration(Jedis j, Registration r) { + Registration r = deserializeReg(data); + byte[] lockValue = null; byte[] lockKey = toLockKey(r.getEndpoint()); try { lockValue = RedisLock.acquire(j, lockKey); - // delete all entries - j.del(toRegIdKey(r.getId())); - j.del(toEndpointKey(r.getEndpoint())); - + if (!removeOnlyIfNotAlive || !r.isAlive()) { + long nbRemoved = j.del(toRegIdKey(r.getId())); + if (nbRemoved > 0) { + j.del(toEndpointKey(r.getEndpoint())); + Collection obsRemoved = unsafeRemoveAllObservations(j, r.getId()); + return new Deregistration(r, obsRemoved); + } + } + return null; } finally { RedisLock.release(j, lockKey, lockValue); } @@ -669,13 +671,14 @@ public void run() { ScanParams params = new ScanParams().match(REG_EP + "*").count(100); String cursor = "0"; do { - // TODO we probably need a lock here ScanResult res = j.scan(cursor.getBytes(), params); for (byte[] key : res.getResult()) { Registration r = deserializeReg(j.get(key)); if (!r.isAlive()) { - deleteRegistration(j, r); - expirationListener.registrationExpired(r, new ArrayList()); + Deregistration dereg = removeRegistration(j, r.getId(), true); + if (dereg != null) + expirationListener.registrationExpired(dereg.getRegistration(), + dereg.getObservations()); } } cursor = res.getStringCursor(); From 6cad5808f09ee577de9c6c380c15981870b8e09c Mon Sep 17 00:00:00 2001 From: Simon Bernard Date: Mon, 9 Oct 2017 16:26:54 +0200 Subject: [PATCH 2/4] Deserialize registration in protected block (after lock) --- .../cluster/RedisRegistrationStore.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java index 4a8f2943e3..1cf68543c9 100644 --- a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java +++ b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java @@ -163,19 +163,19 @@ public UpdatedRegistration updateRegistration(RegistrationUpdate update) { return null; } - // fetch the client - byte[] data = j.get(toEndpointKey(ep)); - if (data == null) { - return null; - } - - Registration r = deserializeReg(data); - byte[] lockValue = null; - byte[] lockKey = toLockKey(r.getEndpoint()); + byte[] lockKey = toLockKey(ep); try { lockValue = RedisLock.acquire(j, lockKey); + // fetch the client + byte[] data = j.get(toEndpointKey(ep)); + if (data == null) { + return null; + } + + Registration r = deserializeReg(data); + Registration updatedRegistration = update.update(r); // store the new client @@ -303,19 +303,18 @@ private Deregistration removeRegistration(Jedis j, String registrationId, boolea return null; } - // fetch the client - byte[] data = j.get(toEndpointKey(ep)); - if (data == null) { - return null; - } - - Registration r = deserializeReg(data); - byte[] lockValue = null; - byte[] lockKey = toLockKey(r.getEndpoint()); + byte[] lockKey = toLockKey(ep); try { lockValue = RedisLock.acquire(j, lockKey); + // fetch the client + byte[] data = j.get(toEndpointKey(ep)); + if (data == null) { + return null; + } + Registration r = deserializeReg(data); + if (!removeOnlyIfNotAlive || !r.isAlive()) { long nbRemoved = j.del(toRegIdKey(r.getId())); if (nbRemoved > 0) { From 02d51909d87a1f922bb77cb50cecc3896f0c4edf Mon Sep 17 00:00:00 2001 From: Simon Bernard Date: Mon, 9 Oct 2017 16:38:13 +0200 Subject: [PATCH 3/4] Add clemency for registration expiration to redisStore --- .../cluster/RedisRegistrationStore.java | 22 +++++++++++++------ .../server/registration/Registration.java | 14 +++++++++++- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java index 1cf68543c9..5379cec735 100644 --- a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java +++ b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java @@ -62,6 +62,11 @@ */ public class RedisRegistrationStore implements CaliforniumRegistrationStore, Startable, Stoppable { + /** Default time in seconds between 2 cleaning tasks (used to remove expired registration). */ + public static final long DEFAULT_CLEAN_PERIOD = 60; + /** Defaut Extra time for registration lifetime in seconds */ + public static final long DEFAULT_GRACE_PERIOD = 0; + private static final Logger LOG = LoggerFactory.getLogger(RedisRegistrationStore.class); // Redis key prefixes @@ -78,21 +83,24 @@ public class RedisRegistrationStore implements CaliforniumRegistrationStore, Sta private final ScheduledExecutorService schedExecutor; private final long cleanPeriod; // in seconds + private final long gracePeriod; // in seconds public RedisRegistrationStore(Pool p) { - this(p, 60); // default clean period 60s + this(p, DEFAULT_CLEAN_PERIOD, DEFAULT_GRACE_PERIOD); // default clean period 60s } - public RedisRegistrationStore(Pool p, long cleanPeriodInSec) { + public RedisRegistrationStore(Pool p, long cleanPeriodInSec, long lifetimeGracePeriodInSec) { this(p, Executors.newScheduledThreadPool(1, new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec))), - cleanPeriodInSec); + cleanPeriodInSec, lifetimeGracePeriodInSec); } - public RedisRegistrationStore(Pool p, ScheduledExecutorService schedExecutor, long cleanPeriodInSec) { + public RedisRegistrationStore(Pool p, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, + long lifetimeGracePeriodInSec) { this.pool = p; this.schedExecutor = schedExecutor; this.cleanPeriod = cleanPeriodInSec; + this.gracePeriod = lifetimeGracePeriodInSec; } /* *************** Redis Key utility function **************** */ @@ -205,7 +213,7 @@ public Registration getRegistrationByEndpoint(String endpoint) { return null; } Registration r = deserializeReg(data); - return r.isAlive() ? r : null; + return r.isAlive(gracePeriod) ? r : null; } } @@ -315,7 +323,7 @@ private Deregistration removeRegistration(Jedis j, String registrationId, boolea } Registration r = deserializeReg(data); - if (!removeOnlyIfNotAlive || !r.isAlive()) { + if (!removeOnlyIfNotAlive || !r.isAlive(gracePeriod)) { long nbRemoved = j.del(toRegIdKey(r.getId())); if (nbRemoved > 0) { j.del(toEndpointKey(r.getEndpoint())); @@ -673,7 +681,7 @@ public void run() { ScanResult res = j.scan(cursor.getBytes(), params); for (byte[] key : res.getResult()) { Registration r = deserializeReg(j.get(key)); - if (!r.isAlive()) { + if (!r.isAlive(gracePeriod)) { Deregistration dereg = removeRegistration(j, r.getId(), true); if (dereg != null) expirationListener.registrationExpired(dereg.getRegistration(), diff --git a/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/Registration.java b/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/Registration.java index e852ad0dd8..dc802c4dc8 100644 --- a/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/Registration.java +++ b/leshan-server-core/src/main/java/org/eclipse/leshan/server/registration/Registration.java @@ -261,8 +261,20 @@ public Date getLastUpdate() { return lastUpdate; } + /** + * @return true if the last registration update was done less than lifetime seconds ago. + */ public boolean isAlive() { - return lastUpdate.getTime() + lifeTimeInSec * 1000 > System.currentTimeMillis(); + return isAlive(0); + } + + /** + * This is the same idea than {@link Registration#isAlive()} but with a grace period.
+ * + * @return true if the last registration update was done less than lifetime+gracePeriod seconds ago. + */ + public boolean isAlive(long gracePeriodInSec) { + return lastUpdate.getTime() + lifeTimeInSec * 1000 + gracePeriodInSec * 1000 > System.currentTimeMillis(); } public Map getAdditionalRegistrationAttributes() { From 3b8e7a91b95560cfeca35f9ee943ef63f062bdca Mon Sep 17 00:00:00 2001 From: Simon Bernard Date: Mon, 9 Oct 2017 16:52:55 +0200 Subject: [PATCH 4/4] remove isAlive test on getRegistrationByEndpoint in redisStore --- .../eclipse/leshan/server/cluster/RedisRegistrationStore.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java index 5379cec735..66d51ba21e 100644 --- a/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java +++ b/leshan-server-cluster/src/main/java/org/eclipse/leshan/server/cluster/RedisRegistrationStore.java @@ -212,8 +212,7 @@ public Registration getRegistrationByEndpoint(String endpoint) { if (data == null) { return null; } - Registration r = deserializeReg(data); - return r.isAlive(gracePeriod) ? r : null; + return deserializeReg(data); } }