Skip to content

Commit

Permalink
Be able to use Redis sentinel
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrunner committed Jun 4, 2020
1 parent b384f80 commit db5702e
Showing 1 changed file with 40 additions and 26 deletions.
66 changes: 40 additions & 26 deletions tilecloud/store/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import socket

import redis
import redis.sentinel

from c2cwsgiutils import stats
from tilecloud import TileStore
Expand All @@ -23,18 +23,32 @@ class RedisTileStore(TileStore):

def __init__(
self,
url,
url=None,
name="tilecloud",
stop_if_empty=True,
timeout=5,
pending_timeout=5 * 60,
max_retries=5,
max_errors_age=24 * 3600,
max_errors_nb=100,
sentinels=None,
service_name="mymaster",
sentinel_kwargs=None,
connection_kwargs={},
**kwargs,
):
super(RedisTileStore, self).__init__(**kwargs)
self._redis = redis.Redis.from_url(url)
super().__init__(**kwargs)

if sentinels is not None:
sentinel = redis.sentinel.Sentinel(
sentinels, sentinel_kwargs=sentinel_kwargs, **connection_kwargs
)
self._master = sentinel.master_for(service_name)
self._slave = sentinel.slave_for(service_name)
else:
self._master = redis.Redis.from_url(url, **connection_kwargs)
self._slave = self._master

self._stop_if_empty = stop_if_empty
self._timeout_ms = int(timeout * 1000)
self._pending_timeout_ms = int(pending_timeout * 1000)
Expand All @@ -47,7 +61,7 @@ def __init__(
self._name = name.encode("utf-8")
self._errors_name = self._name + b"_errors"
try:
self._redis.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True)
self._master.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
Expand All @@ -62,7 +76,7 @@ def get_one(tile):
def list(self):
count = 0
while True:
queues = self._redis.xreadgroup(
queues = self._slave.xreadgroup(
groupname=STREAM_GROUP,
consumername=CONSUMER_NAME,
streams={self._name: ">"},
Expand Down Expand Up @@ -93,14 +107,14 @@ def list(self):

if count % 10 == 0:
stats.set_gauge(
["redis", self._name_str, "nb_messages"], self._redis.xlen(name=self._name)
["redis", self._name_str, "nb_messages"], self._slave.xlen(name=self._name)
)
pending = self._redis.xpending(self._name, STREAM_GROUP)
pending = self._slave.xpending(self._name, STREAM_GROUP)
stats.set_gauge(["redis", self._name_str, "pending"], pending["pending"])

def put_one(self, tile):
try:
self._redis.xadd(name=self._name, fields={"message": encode_message(tile)})
self._master.xadd(name=self._name, fields={"message": encode_message(tile)})
except Exception as e:
logger.warning("Failed sending SQS message", exc_info=True)
tile.error = e
Expand All @@ -115,22 +129,22 @@ def delete_one(self, tile):
assert hasattr(tile, "from_redis")
assert hasattr(tile, "sqs_message")
assert tile.from_redis is True
self._redis.xack(self._name, STREAM_GROUP, tile.sqs_message)
self._redis.xdel(self._name, tile.sqs_message)
self._master.xack(self._name, STREAM_GROUP, tile.sqs_message)
self._master.xdel(self._name, tile.sqs_message)
return tile

def delete_all(self):
"""
Used only by tests
"""
self._redis.xtrim(name=self._name, maxlen=0)
self._master.xtrim(name=self._name, maxlen=0)
# xtrim doesn't empty the group claims. So we have to delete and re-create groups
self._redis.xgroup_destroy(name=self._name, groupname=STREAM_GROUP)
self._redis.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True)
self._redis.xtrim(name=self._errors_name, maxlen=0)
self._master.xgroup_destroy(name=self._name, groupname=STREAM_GROUP)
self._master.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True)
self._master.xtrim(name=self._errors_name, maxlen=0)

def _claim_olds(self):
pendings = self._redis.xpending_range(
pendings = self._master.xpending_range(
name=self._name, groupname=STREAM_GROUP, min="-", max="+", count=10
)
if not pendings:
Expand All @@ -157,27 +171,27 @@ def _claim_olds(self):
to_drop.append(id_)

if to_drop:
drop_messages = self._redis.xclaim(
drop_messages = self._master.xclaim(
name=self._name,
groupname=STREAM_GROUP,
consumername=CONSUMER_NAME,
min_idle_time=self._pending_timeout_ms,
message_ids=to_drop,
)
drop_ids = [drop_message[0] for drop_message in drop_messages]
self._redis.xack(self._name, STREAM_GROUP, *drop_ids)
self._redis.xdel(self._name, *drop_ids)
self._master.xack(self._name, STREAM_GROUP, *drop_ids)
self._master.xdel(self._name, *drop_ids)
for drop_id, drop_message in drop_messages:
tile = decode_message(drop_message[b"message"])
self._redis.xadd(
self._master.xadd(
name=self._errors_name,
fields=dict(tilecoord=str(tile.tilecoord)),
maxlen=self._max_errors_nb,
)
stats.increment_counter(["redis", self._name_str, "dropped"], len(to_drop))

if to_steal:
messages = self._redis.xclaim(
messages = self._master.xclaim(
name=self._name,
groupname=STREAM_GROUP,
consumername=CONSUMER_NAME,
Expand All @@ -194,8 +208,8 @@ def get_status(self):
"""
Returns a map of stats
"""
nb_messages = self._redis.xlen(self._name)
pending = self._redis.xpending(self._name, STREAM_GROUP)
nb_messages = self._slave.xlen(self._name)
pending = self._slave.xpending(self._name, STREAM_GROUP)
tiles_in_error = self._get_errors()

stats.set_gauge(["redis", self._name_str, "nb_messages"], nb_messages)
Expand All @@ -206,10 +220,10 @@ def get_status(self):
}

def _get_errors(self):
now, now_us = self._redis.time()
now, now_us = self._slave.time()
old_timestamp = (now - self._max_errors_age) * 1000 + now_us / 1000

errors = self._redis.xrange(name=self._errors_name)
errors = self._slave.xrange(name=self._errors_name)
tiles_in_error = set()
old_errors = []
for error_id, error_message in errors:
Expand All @@ -220,5 +234,5 @@ def _get_errors(self):
tiles_in_error.add(error_message[b"tilecoord"].decode())
if old_errors:
logger.info("Deleting %d old errors", len(old_errors))
self._redis.xdel(self._errors_name, *old_errors)
self._master.xdel(self._errors_name, *old_errors)
return tiles_in_error

0 comments on commit db5702e

Please sign in to comment.