diff --git a/apps/backend/subscription/tools.py b/apps/backend/subscription/tools.py index 8e02b181c2..2e5f1e396f 100644 --- a/apps/backend/subscription/tools.py +++ b/apps/backend/subscription/tools.py @@ -899,7 +899,7 @@ def get_instances_by_scope_with_checker( @support_multi_biz @SetupObserve(histogram=metrics.app_task_get_instances_by_scope_duration_seconds, get_labels_func=get_scope_labels_func) -@FuncCacheDecorator(cache_time=SUBSCRIPTION_SCOPE_CACHE_TIME) +@FuncCacheDecorator(cache_time=SUBSCRIPTION_SCOPE_CACHE_TIME, uuid_cache_enable=True) def get_instances_by_scope( scope: Dict[str, Union[Dict, int, Any]], data_backend: str ) -> Dict[str, Dict[str, Union[Dict, Any]]]: @@ -1046,9 +1046,7 @@ def get_instances_by_scope( ) if scope["with_info"]["process"]: - instances: Union[RedisList, list] = add_process_info_to_instances( - bk_biz_id, scope, instances, data_backend - ) + instances: Union[RedisList, list] = add_process_info_to_instances(bk_biz_id, scope, instances, data_backend) instances_dict: typing.Union[RedisDict, dict] = DynamicContainer(data_backend=data_backend).container data = { @@ -1109,7 +1107,7 @@ def add_host_info_to_instances(bk_biz_id: int, scope: Dict, instances: RedisList return redis_instances if is_redis_data_backend else instances host_dict = DynamicContainer(data_backend=data_backend).container - + hosts_detail: Union[RedisList, list] = get_host_detail( [instance["service"] for instance in instances], bk_biz_id=bk_biz_id, diff --git a/apps/core/concurrent/cache.py b/apps/core/concurrent/cache.py index af3c85955d..ca04bc878d 100644 --- a/apps/core/concurrent/cache.py +++ b/apps/core/concurrent/cache.py @@ -10,6 +10,7 @@ """ import typing +import uuid import ujson as json import wrapt @@ -19,6 +20,7 @@ from apps.prometheus import metrics from apps.prometheus.helper import observe from apps.utils.cache import format_cache_key +from apps.utils.redis import RedisDict, RedisList from env.constants import CacheBackend DEFAULT_CACHE_TIME = 60 * 15 @@ -28,11 +30,12 @@ class FuncCacheDecorator: cache_time: int = DEFAULT_CACHE_TIME - def __init__(self, cache_time: typing.Optional[int] = None): + def __init__(self, cache_time: typing.Optional[int] = None, uuid_cache_enable: bool = False): """ :param cache_time: 缓存事件(秒) """ self.cache_time = cache_time or DEFAULT_CACHE_TIME + self.uuid_cache_enable = uuid_cache_enable def get_from_cache(self, using: str, key: str) -> typing.Any: cache = caches[using] @@ -42,12 +45,33 @@ def get_from_cache(self, using: str, key: str) -> typing.Any: if using == CacheBackend.DB.value: return json.loads(func_result) + + if self.uuid_cache_enable: + if isinstance(func_result, str) and func_result.startswith("data_backend_redis"): + data_type = func_result.split("_")[-2] + if data_type == "dict": + return RedisDict(cache_uuid_key=func_result) + else: + return RedisList(cache_uuid_key=func_result) + return func_result def set_to_cache(self, using: str, key: str, value: typing.Any): cache = caches[using] if using == CacheBackend.DB.value: value = json.dumps(value) + + if self.uuid_cache_enable: + # 变量上下文会将原本的uuid_key删除,这里重新命名,即可达到缓存目的 + if isinstance(value, RedisDict): + new_uuid_key = f"data_backend_redis_dict_{uuid.uuid4().hex}" + else: + new_uuid_key = f"data_backend_redis_list_{uuid.uuid4().hex}" + value._update_redis_expiry(self.cache_time) + value.client.rename(value.uuid_key, new_uuid_key) + value.cache_uuid_key = new_uuid_key + value = new_uuid_key + cache.set(key, value, self.cache_time) def ttl_from_cache(self, using: str, key: str) -> int: @@ -76,7 +100,6 @@ def __call__( :param kwargs: 关键字参数 :return: """ - func_result: typing.Any = None func_name: str = wrapped.__name__ use_fast_cache: bool = False diff --git a/apps/utils/redis.py b/apps/utils/redis.py index 4c8c7a9aee..ceab8a9df7 100644 --- a/apps/utils/redis.py +++ b/apps/utils/redis.py @@ -67,12 +67,13 @@ def __iter__(self) -> Generator[Tuple[str, str], None, None]: class RedisDataBase: - def __init__(self): - self.uuid_key = f"{uuid.uuid4().hex}" + def __init__(self, uuid_key: str = None, cache_uuid_key: str = None): + self.cache_uuid_key = cache_uuid_key + self.uuid_key = uuid_key or f"{uuid.uuid4().hex}" self.client = get_redis_connection() - def _update_redis_expiry(self): - self.client.expire(self.uuid_key, REDIS_CACHE_DATA_TIMEOUT) + def _update_redis_expiry(self, cache_time=None): + self.client.expire(self.cache_uuid_key or self.uuid_key, cache_time or REDIS_CACHE_DATA_TIMEOUT) def __del__(self): self.client.delete(self.uuid_key) @@ -86,7 +87,7 @@ def __exit__(self, *args, **kwargs): class RedisDict(RedisDataBase, dict): def __setitem__(self, key, value): - self.client.hset(self.uuid_key, key, json.dumps(value)) + self.client.hset(self.cache_uuid_key or self.uuid_key, key, json.dumps(value)) self._update_redis_expiry() def update(self, *args, **kwargs): @@ -94,26 +95,29 @@ def update(self, *args, **kwargs): for k, v in dict(*args, **kwargs).items(): temp_dict[k] = json.dumps(v) if temp_dict: - self.client.hset(self.uuid_key, mapping=temp_dict) + self.client.hset(self.cache_uuid_key or self.uuid_key, mapping=temp_dict) self._update_redis_expiry() def __getitem__(self, key: Any) -> Any: - return json.loads(self.client.hget(self.uuid_key, key) or "null") + return json.loads(self.client.hget(self.cache_uuid_key or self.uuid_key, key) or "null") def __len__(self) -> int: - return self.client.hlen(self.uuid_key) + return self.client.hlen(self.cache_uuid_key or self.uuid_key) def keys(self) -> dict_keys: - return self.client.hkeys(self.uuid_key) + return self.client.hkeys(self.cache_uuid_key or self.uuid_key) def get(self, key: Any, default=None): return self.__getitem__(key) or default def values(self): - return RedisHashValuesScanner(self.uuid_key) + return RedisHashValuesScanner(self.cache_uuid_key or self.uuid_key) def items(self): - return RedisHashScanner(self.uuid_key) + return RedisHashScanner(self.cache_uuid_key or self.uuid_key) + + def __str__(self): + return self.uuid_key class RedisList(RedisDataBase, list): @@ -122,8 +126,8 @@ def __iter__(self) -> Iterator: return self def __next__(self): - if self.index < self.client.llen(self.uuid_key): - item = self.client.lindex(self.uuid_key, self.index) + if self.index < self.client.llen(self.cache_uuid_key or self.uuid_key): + item = self.client.lindex(self.cache_uuid_key or self.uuid_key, self.index) self.index += 1 return json.loads(item) else: @@ -132,15 +136,15 @@ def __next__(self): def extend(self, iterable: Iterable[Any]) -> None: serialized_items = [json.dumps(item) for item in iterable] if serialized_items: - self.client.rpush(self.uuid_key, *serialized_items) + self.client.rpush(self.cache_uuid_key or self.uuid_key, *serialized_items) self._update_redis_expiry() def append(self, obj: Any) -> None: - self.client.rpush(self.uuid_key, json.dumps(obj)) + self.client.rpush(self.cache_uuid_key or self.uuid_key, json.dumps(obj)) self._update_redis_expiry() def __len__(self) -> int: - return self.client.llen(self.uuid_key) + return self.client.llen(self.cache_uuid_key or self.uuid_key) class DynamicContainer: