Skip to content

Commit

Permalink
feat: 超大订阅方案 (closed #2429)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Dec 12, 2024
1 parent df5ad07 commit 4d5df9c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
8 changes: 3 additions & 5 deletions apps/backend/subscription/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ def get_instances_by_scope_with_checker(

@support_multi_biz
@SetupObserve(histogram=metrics.app_task_get_instances_by_scope_duration_seconds, get_labels_func=get_scope_labels_func)
@FuncCacheDecorator(cache_time=SUBSCRIPTION_SCOPE_CACHE_TIME)
@FuncCacheDecorator(cache_time=SUBSCRIPTION_SCOPE_CACHE_TIME, uuid_cache_enable=True)
def get_instances_by_scope(
scope: Dict[str, Union[Dict, int, Any]], data_backend: str
) -> Dict[str, Dict[str, Union[Dict, Any]]]:
Expand Down Expand Up @@ -1046,9 +1046,7 @@ def get_instances_by_scope(
)

if scope["with_info"]["process"]:
instances: Union[RedisList, list] = add_process_info_to_instances(
bk_biz_id, scope, instances, data_backend
)
instances: Union[RedisList, list] = add_process_info_to_instances(bk_biz_id, scope, instances, data_backend)

instances_dict: typing.Union[RedisDict, dict] = DynamicContainer(data_backend=data_backend).container
data = {
Expand Down Expand Up @@ -1109,7 +1107,7 @@ def add_host_info_to_instances(bk_biz_id: int, scope: Dict, instances: RedisList
return redis_instances if is_redis_data_backend else instances

host_dict = DynamicContainer(data_backend=data_backend).container

hosts_detail: Union[RedisList, list] = get_host_detail(
[instance["service"] for instance in instances],
bk_biz_id=bk_biz_id,
Expand Down
27 changes: 25 additions & 2 deletions apps/core/concurrent/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""

import typing
import uuid

import ujson as json
import wrapt
Expand All @@ -19,6 +20,7 @@
from apps.prometheus import metrics
from apps.prometheus.helper import observe
from apps.utils.cache import format_cache_key
from apps.utils.redis import RedisDict, RedisList
from env.constants import CacheBackend

DEFAULT_CACHE_TIME = 60 * 15
Expand All @@ -28,11 +30,12 @@ class FuncCacheDecorator:

cache_time: int = DEFAULT_CACHE_TIME

def __init__(self, cache_time: typing.Optional[int] = None):
def __init__(self, cache_time: typing.Optional[int] = None, uuid_cache_enable: bool = False):
"""
:param cache_time: 缓存事件(秒)
"""
self.cache_time = cache_time or DEFAULT_CACHE_TIME
self.uuid_cache_enable = uuid_cache_enable

def get_from_cache(self, using: str, key: str) -> typing.Any:
cache = caches[using]
Expand All @@ -42,12 +45,33 @@ def get_from_cache(self, using: str, key: str) -> typing.Any:

if using == CacheBackend.DB.value:
return json.loads(func_result)

if self.uuid_cache_enable:
if isinstance(func_result, str) and func_result.startswith("data_backend_redis"):
data_type = func_result.split("_")[-2]
if data_type == "dict":
return RedisDict(cache_uuid_key=func_result)
else:
return RedisList(cache_uuid_key=func_result)

return func_result

def set_to_cache(self, using: str, key: str, value: typing.Any):
cache = caches[using]
if using == CacheBackend.DB.value:
value = json.dumps(value)

if self.uuid_cache_enable:
# 变量上下文会将原本的uuid_key删除,这里重新命名,即可达到缓存目的
if isinstance(value, RedisDict):
new_uuid_key = f"data_backend_redis_dict_{uuid.uuid4().hex}"
else:
new_uuid_key = f"data_backend_redis_list_{uuid.uuid4().hex}"
value._update_redis_expiry(self.cache_time)
value.client.rename(value.uuid_key, new_uuid_key)
value.cache_uuid_key = new_uuid_key
value = new_uuid_key

cache.set(key, value, self.cache_time)

def ttl_from_cache(self, using: str, key: str) -> int:
Expand Down Expand Up @@ -76,7 +100,6 @@ def __call__(
:param kwargs: 关键字参数
:return:
"""

func_result: typing.Any = None
func_name: str = wrapped.__name__
use_fast_cache: bool = False
Expand Down
36 changes: 20 additions & 16 deletions apps/utils/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ def __iter__(self) -> Generator[Tuple[str, str], None, None]:


class RedisDataBase:
def __init__(self):
self.uuid_key = f"{uuid.uuid4().hex}"
def __init__(self, uuid_key: str = None, cache_uuid_key: str = None):
self.cache_uuid_key = cache_uuid_key
self.uuid_key = uuid_key or f"{uuid.uuid4().hex}"
self.client = get_redis_connection()

def _update_redis_expiry(self):
self.client.expire(self.uuid_key, REDIS_CACHE_DATA_TIMEOUT)
def _update_redis_expiry(self, cache_time=None):
self.client.expire(self.cache_uuid_key or self.uuid_key, cache_time or REDIS_CACHE_DATA_TIMEOUT)

def __del__(self):
self.client.delete(self.uuid_key)
Expand All @@ -86,34 +87,37 @@ def __exit__(self, *args, **kwargs):

class RedisDict(RedisDataBase, dict):
def __setitem__(self, key, value):
self.client.hset(self.uuid_key, key, json.dumps(value))
self.client.hset(self.cache_uuid_key or self.uuid_key, key, json.dumps(value))
self._update_redis_expiry()

def update(self, *args, **kwargs):
temp_dict = {}
for k, v in dict(*args, **kwargs).items():
temp_dict[k] = json.dumps(v)
if temp_dict:
self.client.hset(self.uuid_key, mapping=temp_dict)
self.client.hset(self.cache_uuid_key or self.uuid_key, mapping=temp_dict)
self._update_redis_expiry()

def __getitem__(self, key: Any) -> Any:
return json.loads(self.client.hget(self.uuid_key, key) or "null")
return json.loads(self.client.hget(self.cache_uuid_key or self.uuid_key, key) or "null")

def __len__(self) -> int:
return self.client.hlen(self.uuid_key)
return self.client.hlen(self.cache_uuid_key or self.uuid_key)

def keys(self) -> dict_keys:
return self.client.hkeys(self.uuid_key)
return self.client.hkeys(self.cache_uuid_key or self.uuid_key)

def get(self, key: Any, default=None):
return self.__getitem__(key) or default

def values(self):
return RedisHashValuesScanner(self.uuid_key)
return RedisHashValuesScanner(self.cache_uuid_key or self.uuid_key)

def items(self):
return RedisHashScanner(self.uuid_key)
return RedisHashScanner(self.cache_uuid_key or self.uuid_key)

def __str__(self):
return self.uuid_key


class RedisList(RedisDataBase, list):
Expand All @@ -122,8 +126,8 @@ def __iter__(self) -> Iterator:
return self

def __next__(self):
if self.index < self.client.llen(self.uuid_key):
item = self.client.lindex(self.uuid_key, self.index)
if self.index < self.client.llen(self.cache_uuid_key or self.uuid_key):
item = self.client.lindex(self.cache_uuid_key or self.uuid_key, self.index)
self.index += 1
return json.loads(item)
else:
Expand All @@ -132,15 +136,15 @@ def __next__(self):
def extend(self, iterable: Iterable[Any]) -> None:
serialized_items = [json.dumps(item) for item in iterable]
if serialized_items:
self.client.rpush(self.uuid_key, *serialized_items)
self.client.rpush(self.cache_uuid_key or self.uuid_key, *serialized_items)
self._update_redis_expiry()

def append(self, obj: Any) -> None:
self.client.rpush(self.uuid_key, json.dumps(obj))
self.client.rpush(self.cache_uuid_key or self.uuid_key, json.dumps(obj))
self._update_redis_expiry()

def __len__(self) -> int:
return self.client.llen(self.uuid_key)
return self.client.llen(self.cache_uuid_key or self.uuid_key)


class DynamicContainer:
Expand Down

0 comments on commit 4d5df9c

Please sign in to comment.