diff --git a/tilecloud/store/redis.py b/tilecloud/store/redis.py index 69f3a7bbd..5c50e4c34 100644 --- a/tilecloud/store/redis.py +++ b/tilecloud/store/redis.py @@ -4,7 +4,7 @@ import os import socket -import redis +import redis.sentinel from c2cwsgiutils import stats from tilecloud import TileStore @@ -23,7 +23,7 @@ class RedisTileStore(TileStore): def __init__( self, - url, + url=None, name="tilecloud", stop_if_empty=True, timeout=5, @@ -31,10 +31,24 @@ def __init__( max_retries=5, max_errors_age=24 * 3600, max_errors_nb=100, + sentinels=None, + service_name="mymaster", + sentinel_kwargs=None, + connection_kwargs={}, **kwargs, ): - super(RedisTileStore, self).__init__(**kwargs) - self._redis = redis.Redis.from_url(url) + super().__init__(**kwargs) + + if sentinels is not None: + sentinel = redis.sentinel.Sentinel( + sentinels, sentinel_kwargs=sentinel_kwargs, **connection_kwargs + ) + self._master = sentinel.master_for(service_name) + self._slave = sentinel.slave_for(service_name) + else: + self._master = redis.Redis.from_url(url, **connection_kwargs) + self._slave = self._master + self._stop_if_empty = stop_if_empty self._timeout_ms = int(timeout * 1000) self._pending_timeout_ms = int(pending_timeout * 1000) @@ -47,7 +61,7 @@ def __init__( self._name = name.encode("utf-8") self._errors_name = self._name + b"_errors" try: - self._redis.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True) + self._master.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True) except redis.ResponseError as e: if "BUSYGROUP" not in str(e): raise @@ -62,7 +76,7 @@ def get_one(tile): def list(self): count = 0 while True: - queues = self._redis.xreadgroup( + queues = self._master.xreadgroup( groupname=STREAM_GROUP, consumername=CONSUMER_NAME, streams={self._name: ">"}, @@ -93,16 +107,16 @@ def list(self): if count % 10 == 0: stats.set_gauge( - ["redis", self._name_str, "nb_messages"], self._redis.xlen(name=self._name) + ["redis", self._name_str, "nb_messages"], self._slave.xlen(name=self._name) ) - pending = self._redis.xpending(self._name, STREAM_GROUP) + pending = self._slave.xpending(self._name, STREAM_GROUP) stats.set_gauge(["redis", self._name_str, "pending"], pending["pending"]) def put_one(self, tile): try: - self._redis.xadd(name=self._name, fields={"message": encode_message(tile)}) + self._master.xadd(name=self._name, fields={"message": encode_message(tile)}) except Exception as e: - logger.warning("Failed sending SQS message", exc_info=True) + logger.warning("Failed sending Redis message", exc_info=True) tile.error = e def put(self, tiles): @@ -115,22 +129,22 @@ def delete_one(self, tile): assert hasattr(tile, "from_redis") assert hasattr(tile, "sqs_message") assert tile.from_redis is True - self._redis.xack(self._name, STREAM_GROUP, tile.sqs_message) - self._redis.xdel(self._name, tile.sqs_message) + self._master.xack(self._name, STREAM_GROUP, tile.sqs_message) + self._master.xdel(self._name, tile.sqs_message) return tile def delete_all(self): """ Used only by tests """ - self._redis.xtrim(name=self._name, maxlen=0) + self._master.xtrim(name=self._name, maxlen=0) # xtrim doesn't empty the group claims. So we have to delete and re-create groups - self._redis.xgroup_destroy(name=self._name, groupname=STREAM_GROUP) - self._redis.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True) - self._redis.xtrim(name=self._errors_name, maxlen=0) + self._master.xgroup_destroy(name=self._name, groupname=STREAM_GROUP) + self._master.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True) + self._master.xtrim(name=self._errors_name, maxlen=0) def _claim_olds(self): - pendings = self._redis.xpending_range( + pendings = self._master.xpending_range( name=self._name, groupname=STREAM_GROUP, min="-", max="+", count=10 ) if not pendings: @@ -157,7 +171,7 @@ def _claim_olds(self): to_drop.append(id_) if to_drop: - drop_messages = self._redis.xclaim( + drop_messages = self._master.xclaim( name=self._name, groupname=STREAM_GROUP, consumername=CONSUMER_NAME, @@ -165,11 +179,11 @@ def _claim_olds(self): message_ids=to_drop, ) drop_ids = [drop_message[0] for drop_message in drop_messages] - self._redis.xack(self._name, STREAM_GROUP, *drop_ids) - self._redis.xdel(self._name, *drop_ids) + self._master.xack(self._name, STREAM_GROUP, *drop_ids) + self._master.xdel(self._name, *drop_ids) for drop_id, drop_message in drop_messages: tile = decode_message(drop_message[b"message"]) - self._redis.xadd( + self._master.xadd( name=self._errors_name, fields=dict(tilecoord=str(tile.tilecoord)), maxlen=self._max_errors_nb, @@ -177,7 +191,7 @@ def _claim_olds(self): stats.increment_counter(["redis", self._name_str, "dropped"], len(to_drop)) if to_steal: - messages = self._redis.xclaim( + messages = self._master.xclaim( name=self._name, groupname=STREAM_GROUP, consumername=CONSUMER_NAME, @@ -194,8 +208,8 @@ def get_status(self): """ Returns a map of stats """ - nb_messages = self._redis.xlen(self._name) - pending = self._redis.xpending(self._name, STREAM_GROUP) + nb_messages = self._slave.xlen(self._name) + pending = self._slave.xpending(self._name, STREAM_GROUP) tiles_in_error = self._get_errors() stats.set_gauge(["redis", self._name_str, "nb_messages"], nb_messages) @@ -206,10 +220,10 @@ def get_status(self): } def _get_errors(self): - now, now_us = self._redis.time() + now, now_us = self._slave.time() old_timestamp = (now - self._max_errors_age) * 1000 + now_us / 1000 - errors = self._redis.xrange(name=self._errors_name) + errors = self._slave.xrange(name=self._errors_name) tiles_in_error = set() old_errors = [] for error_id, error_message in errors: @@ -220,5 +234,5 @@ def _get_errors(self): tiles_in_error.add(error_message[b"tilecoord"].decode()) if old_errors: logger.info("Deleting %d old errors", len(old_errors)) - self._redis.xdel(self._errors_name, *old_errors) + self._master.xdel(self._errors_name, *old_errors) return tiles_in_error