Skip to content

Commit

Permalink
Merge pull request #761 from camptocamp/backport/758-to-1.5
Browse files Browse the repository at this point in the history
[Backport 1.5] Add message to debug pendings
  • Loading branch information
sbrunner authored Apr 1, 2021
2 parents 3f16098 + 0be0ab7 commit ec42338
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions tilecloud/store/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def list(self):
count=1,
block=round(self._timeout_ms),
)
logger.debug("Get %d new elements", len(queues))

if not queues:
queues = self._claim_olds()
Expand Down Expand Up @@ -146,15 +147,20 @@ def delete_all(self):
self._master.xtrim(name=self._errors_name, maxlen=0)

def _claim_olds(self):
logger.debug("Claim old's")
pendings = self._master.xpending_range(
name=self._name, groupname=STREAM_GROUP, min="-", max="+", count=10
)
if not pendings:
logger.debug("Empty pendings")
# None means there is nothing pending at all
return None
to_steal = []
to_drop = []
for pending in pendings:
logger.debug(
"Pending for %d, threshold %d", int(pending["time_since_delivered"]), self._pending_timeout_ms
)
if int(pending["time_since_delivered"]) >= self._pending_timeout_ms:
id_ = pending["message_id"]
nb_retries = int(pending["times_delivered"])
Expand All @@ -172,6 +178,7 @@ def _claim_olds(self):
)
to_drop.append(id_)

logger.debug("%d elements to drop", len(to_drop))
if to_drop:
drop_messages = self._master.xclaim(
name=self._name,
Expand All @@ -192,6 +199,7 @@ def _claim_olds(self):
)
stats.increment_counter(["redis", self._name_str, "dropped"], len(to_drop))

logger.debug("%d elements to steal", len(to_steal))
if to_steal:
messages = self._master.xclaim(
name=self._name,
Expand Down

0 comments on commit ec42338

Please sign in to comment.