Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Be able to use Redis sentinel #487

Merged
merged 1 commit into from
Jun 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 41 additions & 27 deletions tilecloud/store/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import socket

import redis
import redis.sentinel

from c2cwsgiutils import stats
from tilecloud import TileStore
Expand All @@ -23,18 +23,32 @@ class RedisTileStore(TileStore):

def __init__(
self,
url,
url=None,
name="tilecloud",
stop_if_empty=True,
timeout=5,
pending_timeout=5 * 60,
max_retries=5,
max_errors_age=24 * 3600,
max_errors_nb=100,
sentinels=None,
service_name="mymaster",
sentinel_kwargs=None,
connection_kwargs={},
**kwargs,
):
super(RedisTileStore, self).__init__(**kwargs)
self._redis = redis.Redis.from_url(url)
super().__init__(**kwargs)

if sentinels is not None:
sentinel = redis.sentinel.Sentinel(
sentinels, sentinel_kwargs=sentinel_kwargs, **connection_kwargs
)
self._master = sentinel.master_for(service_name)
self._slave = sentinel.slave_for(service_name)
else:
self._master = redis.Redis.from_url(url, **connection_kwargs)
self._slave = self._master

self._stop_if_empty = stop_if_empty
self._timeout_ms = int(timeout * 1000)
self._pending_timeout_ms = int(pending_timeout * 1000)
Expand All @@ -47,7 +61,7 @@ def __init__(
self._name = name.encode("utf-8")
self._errors_name = self._name + b"_errors"
try:
self._redis.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True)
self._master.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
Expand All @@ -62,7 +76,7 @@ def get_one(tile):
def list(self):
count = 0
while True:
queues = self._redis.xreadgroup(
queues = self._master.xreadgroup(
groupname=STREAM_GROUP,
consumername=CONSUMER_NAME,
streams={self._name: ">"},
Expand Down Expand Up @@ -93,16 +107,16 @@ def list(self):

if count % 10 == 0:
stats.set_gauge(
["redis", self._name_str, "nb_messages"], self._redis.xlen(name=self._name)
["redis", self._name_str, "nb_messages"], self._slave.xlen(name=self._name)
)
pending = self._redis.xpending(self._name, STREAM_GROUP)
pending = self._slave.xpending(self._name, STREAM_GROUP)
stats.set_gauge(["redis", self._name_str, "pending"], pending["pending"])

def put_one(self, tile):
try:
self._redis.xadd(name=self._name, fields={"message": encode_message(tile)})
self._master.xadd(name=self._name, fields={"message": encode_message(tile)})
except Exception as e:
logger.warning("Failed sending SQS message", exc_info=True)
logger.warning("Failed sending Redis message", exc_info=True)
tile.error = e

def put(self, tiles):
Expand All @@ -115,22 +129,22 @@ def delete_one(self, tile):
assert hasattr(tile, "from_redis")
assert hasattr(tile, "sqs_message")
assert tile.from_redis is True
self._redis.xack(self._name, STREAM_GROUP, tile.sqs_message)
self._redis.xdel(self._name, tile.sqs_message)
self._master.xack(self._name, STREAM_GROUP, tile.sqs_message)
self._master.xdel(self._name, tile.sqs_message)
return tile

def delete_all(self):
"""
Used only by tests
"""
self._redis.xtrim(name=self._name, maxlen=0)
self._master.xtrim(name=self._name, maxlen=0)
# xtrim doesn't empty the group claims. So we have to delete and re-create groups
self._redis.xgroup_destroy(name=self._name, groupname=STREAM_GROUP)
self._redis.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True)
self._redis.xtrim(name=self._errors_name, maxlen=0)
self._master.xgroup_destroy(name=self._name, groupname=STREAM_GROUP)
self._master.xgroup_create(name=self._name, groupname=STREAM_GROUP, id="0-0", mkstream=True)
self._master.xtrim(name=self._errors_name, maxlen=0)

def _claim_olds(self):
pendings = self._redis.xpending_range(
pendings = self._master.xpending_range(
name=self._name, groupname=STREAM_GROUP, min="-", max="+", count=10
)
if not pendings:
Expand All @@ -157,27 +171,27 @@ def _claim_olds(self):
to_drop.append(id_)

if to_drop:
drop_messages = self._redis.xclaim(
drop_messages = self._master.xclaim(
name=self._name,
groupname=STREAM_GROUP,
consumername=CONSUMER_NAME,
min_idle_time=self._pending_timeout_ms,
message_ids=to_drop,
)
drop_ids = [drop_message[0] for drop_message in drop_messages]
self._redis.xack(self._name, STREAM_GROUP, *drop_ids)
self._redis.xdel(self._name, *drop_ids)
self._master.xack(self._name, STREAM_GROUP, *drop_ids)
self._master.xdel(self._name, *drop_ids)
for drop_id, drop_message in drop_messages:
tile = decode_message(drop_message[b"message"])
self._redis.xadd(
self._master.xadd(
name=self._errors_name,
fields=dict(tilecoord=str(tile.tilecoord)),
maxlen=self._max_errors_nb,
)
stats.increment_counter(["redis", self._name_str, "dropped"], len(to_drop))

if to_steal:
messages = self._redis.xclaim(
messages = self._master.xclaim(
name=self._name,
groupname=STREAM_GROUP,
consumername=CONSUMER_NAME,
Expand All @@ -194,8 +208,8 @@ def get_status(self):
"""
Returns a map of stats
"""
nb_messages = self._redis.xlen(self._name)
pending = self._redis.xpending(self._name, STREAM_GROUP)
nb_messages = self._slave.xlen(self._name)
pending = self._slave.xpending(self._name, STREAM_GROUP)
tiles_in_error = self._get_errors()

stats.set_gauge(["redis", self._name_str, "nb_messages"], nb_messages)
Expand All @@ -206,10 +220,10 @@ def get_status(self):
}

def _get_errors(self):
now, now_us = self._redis.time()
now, now_us = self._slave.time()
old_timestamp = (now - self._max_errors_age) * 1000 + now_us / 1000

errors = self._redis.xrange(name=self._errors_name)
errors = self._slave.xrange(name=self._errors_name)
tiles_in_error = set()
old_errors = []
for error_id, error_message in errors:
Expand All @@ -220,5 +234,5 @@ def _get_errors(self):
tiles_in_error.add(error_message[b"tilecoord"].decode())
if old_errors:
logger.info("Deleting %d old errors", len(old_errors))
self._redis.xdel(self._errors_name, *old_errors)
self._master.xdel(self._errors_name, *old_errors)
return tiles_in_error