diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 051a9a047e..635b315c24 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,7 +11,7 @@ repos: name: black language: python - repo: https://github.com/pycqa/isort - rev: 5.5.4 + rev: 5.12.0 hooks: - id: isort args: [ "--profile", "black" ] diff --git a/apps/backend/agent/manager.py b/apps/backend/agent/manager.py index 1fa8393d3a..0b70aed5fe 100644 --- a/apps/backend/agent/manager.py +++ b/apps/backend/agent/manager.py @@ -22,6 +22,7 @@ class AgentServiceActivity(ServiceActivity): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.is_multi_paralle_gateway: bool = False self.component.inputs.meta = Var(type=Var.SPLICE, value="${meta}") self.component.inputs.description = Var(type=Var.SPLICE, value="${description}") self.component.inputs.blueking_language = Var(type=Var.SPLICE, value="${blueking_language}") diff --git a/apps/backend/components/collections/agent_new/base.py b/apps/backend/components/collections/agent_new/base.py index 3c8fad0cd1..3a1da6ee4e 100644 --- a/apps/backend/components/collections/agent_new/base.py +++ b/apps/backend/components/collections/agent_new/base.py @@ -32,7 +32,7 @@ from apps.prometheus.helper import SetupObserve from .. import job -from ..base import BaseService, CommonData +from ..base import BaseService, CommonData, RedisCommonData logger = logging.getLogger("celery") @@ -49,6 +49,29 @@ class AgentCommonData(CommonData): injected_ap_id: int +class RedisAgentCommonData(RedisCommonData): + + # 默认接入点 + @property + def default_ap(self) -> models.AccessPoint: + return self._get_attr_from_redis("default_ap") + + # 主机ID - 接入点 映射关系 + @property + def host_id__ap_map(self) -> Dict[int, models.AccessPoint]: + return self._get_attr_from_redis("host_id__ap_map") + + # AgentStep 适配器 + @property + def agent_step_adapter(self) -> AgentStepAdapter: + return self._get_attr_from_redis("agent_step_adapter") + + # 注入AP_ID + @property + def injected_ap_id(self) -> int: + return self._get_attr_from_redis("injected_ap_id") + + class AgentBaseService(BaseService, metaclass=abc.ABCMeta): """ AGENT安装基类 @@ -91,7 +114,9 @@ def get_common_data(cls, data): common_data.subscription_step, gse_version=data.get_one_of_inputs("meta", {}).get("GSE_VERSION") ) - return AgentCommonData( + agent_common_data_cls = AgentCommonData if isinstance(common_data, CommonData) else RedisAgentCommonData + + return agent_common_data_cls( bk_host_ids=common_data.bk_host_ids, host_id_obj_map=common_data.host_id_obj_map, ap_id_obj_map=common_data.ap_id_obj_map, diff --git a/apps/backend/components/collections/base.py b/apps/backend/components/collections/base.py index 8cb1abf86c..1a3d80015d 100644 --- a/apps/backend/components/collections/base.py +++ b/apps/backend/components/collections/base.py @@ -8,10 +8,13 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import ast import logging import os +import pickle import traceback import typing +import uuid from collections import defaultdict from dataclasses import dataclass from typing import ( @@ -32,6 +35,7 @@ from django.db.models.functions import Concat from django.utils import timezone from django.utils.translation import ugettext as _ +from django_redis import get_redis_connection from apps.adapters.api.gse import GseApiBaseHelper, get_gse_api_helper from apps.backend.api.constants import POLLING_TIMEOUT @@ -45,6 +49,7 @@ from apps.prometheus.helper import SetupObserve from apps.utils import cache, time_handler, translation from apps.utils.exc import ExceptionHandler +from apps.utils.redis import REDIS_CACHE_DATA_TIMEOUT from pipeline.core.flow import Service logger = logging.getLogger("celery") @@ -263,6 +268,87 @@ class CommonData: subscription_instance_ids: Set[int] +@dataclass +class DataClassFields: + name: str + + +class RedisCommonData: + def __init__(self, *args, **kwargs): + self.uuid_key = uuid.uuid4().hex + self.client = get_redis_connection() + + for k, v in dict(*args, **kwargs).items(): + self.client.hset(self.uuid_key, k, str(pickle.dumps(v))) + + self.client.expire(self.uuid_key, REDIS_CACHE_DATA_TIMEOUT) + + def _get_attr_from_redis(self, key): + return pickle.loads(ast.literal_eval(self.client.hget(self.uuid_key, key))) + + def __del__(self): + self.client.delete(self.uuid_key) + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.__del__() + + @property + def __dataclass_fields__(self): + # 遍历类的 __dict__ 属性 + for attr_name, attr_value in self.__class__.__dict__.items(): + # 检查属性值是否是一个 property 对象 + if isinstance(attr_value, property) and not attr_name == "__dataclass_fields__": + # 如果是,则将其名称添加到列表中 + yield DataClassFields(name=attr_name) + + @property + def bk_host_ids(self) -> Set[int]: + return self._get_attr_from_redis("bk_host_ids") + + @property + def host_id_obj_map(self) -> Dict[int, models.Host]: + return self._get_attr_from_redis("host_id_obj_map") + + @property + def sub_inst_id__host_id_map(self) -> Dict[int, int]: + return self._get_attr_from_redis("sub_inst_id__host_id_map") + + @property + def host_id__sub_inst_id_map(self) -> Dict[int, int]: + return self._get_attr_from_redis("host_id__sub_inst_id_map") + + @property + def ap_id_obj_map(self) -> Dict[int, models.AccessPoint]: + return self._get_attr_from_redis("ap_id_obj_map") + + @property + def sub_inst_id__sub_inst_obj_map(self) -> Dict[int, models.SubscriptionInstanceRecord]: + return self._get_attr_from_redis("sub_inst_id__sub_inst_obj_map") + + @property + def gse_api_helper(self) -> GseApiBaseHelper: + return self._get_attr_from_redis("gse_api_helper") + + @property + def subscription(self) -> models.Subscription: + return self._get_attr_from_redis("subscription") + + @property + def subscription_step(self) -> models.SubscriptionStep: + return self._get_attr_from_redis("subscription_step") + + @property + def subscription_instances(self) -> List[models.SubscriptionInstanceRecord]: + return self._get_attr_from_redis("subscription_instances") + + @property + def subscription_instance_ids(self) -> Set[int]: + return self._get_attr_from_redis("subscription_instance_ids") + + class BaseService(Service, LogMixin, DBHelperMixin, PollingTimeoutMixin): # 失败订阅实例ID - 失败原因 映射关系 @@ -447,7 +533,10 @@ def get_common_data(cls, data): break ap_id_obj_map = models.AccessPoint.ap_id_obj_map() - return CommonData( + + common_data_cls = RedisCommonData if data.get_one_of_inputs("is_multi_paralle_gateway") else CommonData + + return common_data_cls( bk_host_ids=bk_host_ids, host_id_obj_map=host_id_obj_map, sub_inst_id__host_id_map=sub_inst_id__host_id_map, @@ -610,6 +699,9 @@ def inputs_format(self): ), Service.InputItem(name="subscription_step_id", key="subscription_step_id", type="int", required=True), Service.InputItem(name="blueking_language", key="blueking_language", type="str", required=True), + Service.InputItem( + name="is_multi_paralle_gateway", key="is_multi_paralle_gateway", type="bool", required=True + ), ] def outputs_format(self): diff --git a/apps/backend/components/collections/plugin.py b/apps/backend/components/collections/plugin.py index 40763de3a9..1d0e021f9c 100644 --- a/apps/backend/components/collections/plugin.py +++ b/apps/backend/components/collections/plugin.py @@ -31,7 +31,11 @@ GseDataErrCode, ) from apps.backend.api.job import process_parms -from apps.backend.components.collections.base import BaseService, CommonData +from apps.backend.components.collections.base import ( + BaseService, + CommonData, + RedisCommonData, +) from apps.backend.components.collections.common.script_content import INITIALIZE_SCRIPT from apps.backend.components.collections.job import ( JobExecuteScriptService, @@ -86,6 +90,34 @@ def __post_init__(self): self.plugin_name = self.policy_step_adapter.plugin_name +class RedisPluginCommonData(RedisCommonData): + + # 进程状态列表 + @property + def process_statuses(self) -> List[models.ProcessStatus]: + return self._get_attr_from_redis("process_statuses") + + # 目标主机列表,用于远程采集场景 + @property + def target_host_objs(self) -> Optional[List[models.Host]]: + return self._get_attr_from_redis("target_host_objs") + + # PluginStep 适配器,用于屏蔽不同类型的插件操作类订阅差异 + @property + def policy_step_adapter(self) -> PolicyStepAdapter: + return self._get_attr_from_redis("policy_step_adapter") + + # group_id - 订阅实例记录映射关系 + @property + def group_id_instance_map(self) -> Dict[str, models.SubscriptionInstanceRecord]: + return self._get_attr_from_redis("group_id_instance_map") + + # 插件名称 + @property + def plugin_name(self) -> str: + return self._get_attr_from_redis("policy_step_adapter").plugin_name + + class PluginBaseService(BaseService, metaclass=abc.ABCMeta): """ 插件原子基类,提供一些常用的数据获取方法 @@ -134,7 +166,8 @@ def get_common_data(cls, data): process_statuses = models.ProcessStatus.objects.filter( name=policy_step_adapter.plugin_name, group_id__in=group_id_instance_map.keys() ) - return PluginCommonData( + plugin_common_data_cls = PluginCommonData if isinstance(common_data, CommonData) else RedisPluginCommonData + return plugin_common_data_cls( bk_host_ids=common_data.bk_host_ids, host_id_obj_map=common_data.host_id_obj_map, ap_id_obj_map=common_data.ap_id_obj_map, diff --git a/apps/backend/periodic_tasks/cache_scope_instances.py b/apps/backend/periodic_tasks/cache_scope_instances.py index d4d534de65..75b4d09569 100644 --- a/apps/backend/periodic_tasks/cache_scope_instances.py +++ b/apps/backend/periodic_tasks/cache_scope_instances.py @@ -15,6 +15,7 @@ from apps.backend.subscription import constants, tools from apps.node_man import models +from apps.node_man.constants import DataBackend from apps.utils.md5 import count_md5 from apps.utils.periodic_task import calculate_countdown @@ -31,8 +32,9 @@ def get_instances_by_scope_task(subscription_id): f" scope_md5: {scope_md5}, scope: {subscription.scope}" ) # 查询后会进行缓存,详见 get_instances_by_scope 的装饰器 func_cache_decorator + data_backend = DataBackend.REDIS.value if subscription.is_multi_paralle_gateway else DataBackend.MEM.value tools.get_instances_by_scope_with_checker( - subscription.scope, subscription.steps, source="get_instances_by_scope_task" + subscription.scope, subscription.steps, source="get_instances_by_scope_task", data_backend=data_backend ) logger.info(f"[cache_subscription_scope_instances] (subscription: {subscription_id}) end.") diff --git a/apps/backend/subscription/commons.py b/apps/backend/subscription/commons.py index 851f853e3d..03f24bf9f4 100644 --- a/apps/backend/subscription/commons.py +++ b/apps/backend/subscription/commons.py @@ -9,6 +9,7 @@ specific language governing permissions and limitations under the License. """ import logging +from typing import Union from django.core.cache import cache @@ -18,6 +19,7 @@ from apps.prometheus import metrics from apps.prometheus.helper import SetupObserve, get_call_resource_labels_func from apps.utils.batch_request import batch_request +from apps.utils.redis import DynamicContainer, RedisList logger = logging.getLogger("app") @@ -38,7 +40,7 @@ def get_host_object_attribute(bk_biz_id): @SetupObserve(counter=metrics.app_common_method_requests_total, get_labels_func=get_call_resource_labels_func) -def list_biz_hosts(bk_biz_id, condition, func, split_params=False): +def list_biz_hosts(bk_biz_id, condition, func, split_params=False, data_backend: str = None): biz_custom_property = [] kwargs = { "fields": constants.CC_HOST_FIELDS, @@ -50,14 +52,16 @@ def list_biz_hosts(bk_biz_id, condition, func, split_params=False): kwargs["fields"] += list(set(biz_custom_property)) kwargs["fields"] = list(set(kwargs["fields"])) kwargs.update(condition) - - hosts = batch_request(getattr(client_v2.cc, func), kwargs, split_params=split_params) + hosts = batch_request(getattr(client_v2.cc, func), kwargs, split_params=split_params, data_backend=data_backend) # 排除掉CMDB中内网IP为空的主机 - cleaned_hosts = [host for host in hosts if host.get("bk_host_innerip") or host.get("bk_host_innerip_v6")] + cleaned_hosts: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container + cleaned_hosts.extend([host for host in hosts if host.get("bk_host_innerip") or host.get("bk_host_innerip_v6")]) return cleaned_hosts -def get_host_by_inst(bk_biz_id, inst_list): +def get_host_by_inst(bk_biz_id, inst_list, data_backend: str = None): """ 根据拓扑节点查询主机 :param inst_list: 实例列表 @@ -67,7 +71,9 @@ def get_host_by_inst(bk_biz_id, inst_list): if not bk_biz_id: raise BizNotExistError() - hosts = [] + hosts: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container bk_module_ids = [] bk_set_ids = [] bk_biz_ids = [] @@ -88,13 +94,27 @@ def get_host_by_inst(bk_biz_id, inst_list): # 自定义层级 topo_cond = {"bk_obj_id": inst["bk_obj_id"], "bk_inst_id": inst["bk_inst_id"]} hosts.extend( - list_biz_hosts(bk_biz_id, topo_cond, "find_host_by_topo", source="get_host_by_inst:find_host_by_topo") + list_biz_hosts( + bk_biz_id, + topo_cond, + "find_host_by_topo", + source="get_host_by_inst:find_host_by_topo", + data_backend=data_backend, + ) ) if bk_biz_ids: # 业务查询 for bk_biz_id in bk_biz_ids: - hosts.extend(list_biz_hosts(bk_biz_id, {}, "list_biz_hosts", source="get_host_by_inst:list_biz_hosts:biz")) + hosts.extend( + list_biz_hosts( + bk_biz_id, + {}, + "list_biz_hosts", + source="get_host_by_inst:list_biz_hosts:biz", + data_backend=data_backend, + ) + ) if bk_set_ids: # 集群查询 hosts.extend( @@ -102,6 +122,7 @@ def get_host_by_inst(bk_biz_id, inst_list): bk_biz_id, {"set_cond": [{"field": "bk_set_id", "operator": "$in", "value": bk_set_ids}]}, "list_biz_hosts", + data_backend=data_backend, ) ) if bk_module_ids: @@ -113,6 +134,7 @@ def get_host_by_inst(bk_biz_id, inst_list): "list_biz_hosts", split_params=True, source="get_host_by_inst:list_biz_hosts:module", + data_backend=data_backend, ) ) diff --git a/apps/backend/subscription/constants.py b/apps/backend/subscription/constants.py index e9d2730972..7226ec12bb 100644 --- a/apps/backend/subscription/constants.py +++ b/apps/backend/subscription/constants.py @@ -32,5 +32,8 @@ # 单个任务主机数量 TASK_HOST_LIMIT = 500 +# 单个并行网关的子进程数量 并行网关主机数量 1000*500 +PARALLE_GATEWAY_PROCESS_LIMIT = 1000 + # 订阅范围实例缓存时间,比自动下发周期多1小时 SUBSCRIPTION_SCOPE_CACHE_TIME = SUBSCRIPTION_UPDATE_INTERVAL + constants.TimeUnit.HOUR diff --git a/apps/backend/subscription/handler.py b/apps/backend/subscription/handler.py index 021d1ddfbe..84ea6e434b 100644 --- a/apps/backend/subscription/handler.py +++ b/apps/backend/subscription/handler.py @@ -15,7 +15,7 @@ import random from collections import Counter, defaultdict from copy import deepcopy -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List, Optional, Set, Union from django.conf import settings from django.core.cache import cache @@ -32,6 +32,7 @@ from apps.node_man import constants, models from apps.utils import concurrent from apps.utils.basic import filter_values +from apps.utils.redis import RedisDict from pipeline.engine.models import PipelineProcess from pipeline.service import task_service @@ -135,9 +136,18 @@ def task_result( if not need_out_of_scope_snapshots: # 如果不需要已不在订阅范围内的执行快照,查询订阅范围过滤掉移除的实例 ID subscription = models.Subscription.objects.get(id=self.subscription_id) + data_backend = ( + constants.DataBackend.REDIS.value + if subscription.is_multi_paralle_gateway + else constants.DataBackend.MEM.value + ) scope_instance_id_list: Set[str] = set( tools.get_instances_by_scope_with_checker( - subscription.scope, subscription.steps, get_cache=True, source="task_result" + subscription.scope, + subscription.steps, + get_cache=True, + source="task_result", + data_backend=data_backend, ).keys() ) base_kwargs["instance_id__in"] = scope_instance_id_list @@ -523,8 +533,13 @@ def statistic(subscription_id_list: List[int]) -> List[Dict]: sub_statistic_list: List[Dict] = [] for subscription in subscriptions: sub_statistic = {"subscription_id": subscription.id, "status": []} - current_instances = tools.get_instances_by_scope_with_checker( - subscription.scope, subscription.steps, get_cache=True, source="statistic" + data_backend = ( + constants.DataBackend.REDIS.value + if subscription.is_multi_paralle_gateway + else constants.DataBackend.MEM.value + ) + current_instances: Union[RedisDict, dict] = tools.get_instances_by_scope_with_checker( + subscription.scope, subscription.steps, get_cache=True, source="statistic", data_backend=data_backend ) status_statistic = {"SUCCESS": 0, "PENDING": 0, "FAILED": 0, "RUNNING": 0} @@ -637,8 +652,17 @@ def instance_status(subscription_id_list: List[int], show_task_detail: bool) -> result = [] for subscription in subscriptions: subscription_result = [] - current_instances = tools.get_instances_by_scope_with_checker( - subscription.scope, subscription.steps, get_cache=True, source="instance_status" + data_backend = ( + constants.DataBackend.REDIS.value + if subscription.is_multi_paralle_gateway + else constants.DataBackend.MEM.value + ) + current_instances: Union[RedisDict, dict] = tools.get_instances_by_scope_with_checker( + subscription.scope, + subscription.steps, + get_cache=True, + source="instance_status", + data_backend=data_backend, ) # 对于每个instance,通过group_id找到其对应的host_status diff --git a/apps/backend/subscription/steps/agent.py b/apps/backend/subscription/steps/agent.py index ec6bb466bc..dbef6be9d0 100644 --- a/apps/backend/subscription/steps/agent.py +++ b/apps/backend/subscription/steps/agent.py @@ -195,12 +195,14 @@ def generate_activities( global_pipeline_data: builder.Data, meta: Dict[str, Any], current_activities=None, + is_multi_paralle_gateway=False, ) -> Tuple[List[Union[builder.ServiceActivity, Element]], Optional[builder.Data]]: agent_manager = self.get_agent_manager(subscription_instances) activities, pipeline_data = self._generate_activities(agent_manager) for act in activities: act.component.inputs.subscription_step_id = Var(type=Var.PLAIN, value=self.step.subscription_step.id) act.component.inputs.meta = Var(type=Var.PLAIN, value=meta) + act.component.inputs.is_multi_paralle_gateway = Var(type=Var.PLAIN, value=is_multi_paralle_gateway) self.inject_vars_to_global_data(global_pipeline_data, meta) if self.is_install_other_agent: activities = list(filter(lambda x: x.component["code"] in self.install_other_agent_codes, activities)) diff --git a/apps/backend/subscription/steps/base.py b/apps/backend/subscription/steps/base.py index 9aa5cbb2b3..0e6441ca82 100644 --- a/apps/backend/subscription/steps/base.py +++ b/apps/backend/subscription/steps/base.py @@ -140,5 +140,6 @@ def generate_activities( global_pipeline_data: builder.Data, meta: Dict[str, Any], current_activities=None, + is_multi_paralle_gateway=False, ) -> Tuple[List[Union[builder.ServiceActivity, Element]], Optional[builder.Data]]: raise NotImplementedError diff --git a/apps/backend/subscription/steps/plugin.py b/apps/backend/subscription/steps/plugin.py index f11e9cea0a..e63a3e88bc 100644 --- a/apps/backend/subscription/steps/plugin.py +++ b/apps/backend/subscription/steps/plugin.py @@ -31,6 +31,7 @@ from apps.node_man import constants, models from apps.node_man.exceptions import ApIDNotExistsError from apps.utils import concurrent +from apps.utils.redis import RedisDict from common.log import logger from pipeline.builder import Data, Var @@ -384,7 +385,7 @@ def handle_uninstall_instances( def handle_new_add_instances( self, install_action: str, - instances: Dict[str, Dict], + instances: Union[RedisDict, Dict[str, Dict]], instance_actions: Dict[str, str], bk_host_id__host_map: Dict[int, models.Host], group_id__host_key__proc_status_map: Dict[str, Dict[str, models.ProcessStatus]], @@ -442,7 +443,7 @@ def handle_exceed_max_retry_times_instances( def handle_manual_op_instances( self, - instances: Dict[str, Dict], + instances: Union[RedisDict, Dict[str, Dict]], instance_actions: Dict[str, str], auto_trigger: bool, push_migrate_reason_func: Callable, @@ -493,7 +494,10 @@ def handle_manual_op_instances( ) def handle_not_change_instances( - self, instances: Dict[str, Dict], migrate_reasons: Dict[str, Dict], push_migrate_reason_func: Callable + self, + instances: Union[RedisDict, Dict[str, Dict]], + migrate_reasons: Dict[str, Dict], + push_migrate_reason_func: Callable, ): """ 处理无需变更实例,请在最后调用该钩子 @@ -585,7 +589,7 @@ def handle_check_and_skip_instances( instance_actions: Dict[str, str], push_migrate_reason_func: Callable, bk_host_id__host_map: Dict[int, models.Host], - instances: Dict[str, Dict[str, Union[Dict, Any]]], + instances: Union[RedisDict, Dict[str, Dict]], ): """ 插件状态及版本检查,确定是否执行安装 @@ -715,7 +719,7 @@ def get_action_dict(self) -> Dict[str, str]: def make_instances_migrate_actions( self, - instances: Dict[str, Dict[str, Union[Dict, Any]]], + instances: Union[RedisDict, Dict[str, Dict]], auto_trigger: bool = False, preview_only: bool = False, **kwargs, @@ -765,7 +769,7 @@ def _push_migrate_reason(_instance_id: str, **_extra_info): id_to_instance_id = {} instance_key = "host" if self.subscription.object_type == models.Subscription.ObjectType.HOST else "service" id_key = "bk_host_id" if instance_key == "host" else "id" - for instance_id, instance in list(instances.items()): + for instance_id, instance in instances.items(): instance_ids.add(instance_id) bk_host_ids.add(instance["host"]["bk_host_id"]) id_to_instance_id[instance[instance_key][id_key]] = instance_id @@ -951,6 +955,7 @@ def generate_activities( global_pipeline_data: Data, meta: Dict[str, Any], current_activities=None, + is_multi_paralle_gateway=False, ) -> Tuple[List[PluginServiceActivity], Data]: plugin_manager = self.get_plugin_manager(subscription_instances) activities = [] @@ -980,6 +985,7 @@ def generate_activities( act.component.inputs.plugin_name = Var(type=Var.PLAIN, value=self.step.plugin_name) act.component.inputs.subscription_step_id = Var(type=Var.PLAIN, value=self.step.subscription_step.id) act.component.inputs.meta = Var(type=Var.PLAIN, value=meta) + act.component.inputs.is_multi_paralle_gateway = Var(type=Var.PLAIN, value=is_multi_paralle_gateway) return activities, pipeline_data @abc.abstractmethod diff --git a/apps/backend/subscription/task_tools.py b/apps/backend/subscription/task_tools.py index 4c641cff97..9e48376345 100644 --- a/apps/backend/subscription/task_tools.py +++ b/apps/backend/subscription/task_tools.py @@ -32,31 +32,34 @@ class TaskResultTools: def list_pipeline_processes(pipeline_id: str) -> Dict[str, List[Dict]]: pipeline = models.PipelineTree.objects.get(id=pipeline_id).tree - parallel_gw = next( - (gw for gw in pipeline["gateways"].values() if gw["type"] == pipeline_parser.ActType.PARALLEL), None - ) + # parallel_gw = next( + # (gw for gw in pipeline["gateways"].values() if gw["type"] == pipeline_parser.ActType.PARALLEL), None + # ) + parallel_gws = [gw for gw in pipeline["gateways"].values() if gw["type"] == pipeline_parser.ActType.PARALLEL] pipeline_processes = {} - for outgoing in parallel_gw["outgoing"]: - pipeline_process = [] - index = 0 - while True: - next_node = next( - (node for node in pipeline["activities"].values() if outgoing in node["incoming"]), None - ) - if not next_node: - break - pipeline_process.append( - { - "node_id": next_node["id"], - "name": next_node["name"], - "step_code": next_node["component"].get("code"), - "index": index, - } - ) - index = index + 1 - outgoing = next_node["outgoing"] - pipeline_processes[pipeline_process[0]["node_id"]] = pipeline_process + for parallel_gw in parallel_gws: + for outgoing in parallel_gw["outgoing"]: + pipeline_process = [] + index = 0 + while True: + next_node = next( + (node for node in pipeline["activities"].values() if outgoing in node["incoming"]), None + ) + if not next_node: + break + pipeline_process.append( + { + "node_id": next_node["id"], + "name": next_node["name"], + "step_code": next_node["component"].get("code"), + "index": index, + } + ) + index = index + 1 + outgoing = next_node["outgoing"] + pipeline_processes[pipeline_process[0]["node_id"]] = pipeline_process + return pipeline_processes @staticmethod diff --git a/apps/backend/subscription/tasks.py b/apps/backend/subscription/tasks.py index 8165cc4b8e..c626c6c9c9 100644 --- a/apps/backend/subscription/tasks.py +++ b/apps/backend/subscription/tasks.py @@ -24,7 +24,10 @@ from apps.backend.celery import app from apps.backend.components.collections.base import ActivityType from apps.backend.subscription import handler, tools -from apps.backend.subscription.constants import TASK_HOST_LIMIT +from apps.backend.subscription.constants import ( + PARALLE_GATEWAY_PROCESS_LIMIT, + TASK_HOST_LIMIT, +) from apps.backend.subscription.errors import SubscriptionInstanceEmpty from apps.backend.subscription.steps import StepFactory, agent from apps.core.gray.tools import GrayTools @@ -33,6 +36,7 @@ from apps.node_man.handlers.cmdb import CmdbHandler from apps.prometheus import metrics from apps.utils import md5, translation +from apps.utils.redis import DynamicContainer, RedisDict from pipeline import builder from pipeline.builder import Data, NodeOutput, ServiceActivity, Var from pipeline.core.pipeline import Pipeline @@ -107,6 +111,7 @@ def build_instances_task( global_pipeline_data=global_pipeline_data, meta=inject_meta, current_activities=current_activities, + is_multi_paralle_gateway=subscription.is_multi_paralle_gateway, ) # 记录每个 step 的起始 id 及步骤名称 @@ -172,7 +177,8 @@ def create_pipeline( subscription: models.Subscription, instances_action: Dict[str, Dict[str, str]], subscription_instances: List[models.SubscriptionInstanceRecord], - task_host_limit: int, + task_host_limit_config: Dict[str, int], + is_multi_paralle_gateway: bool, ) -> Pipeline: """ 批量执行实例的步骤的动作 @@ -196,6 +202,22 @@ def create_pipeline( | | | install_plugin install_plugin ....... | | | + render_config render_config ....... + | | | + ....... ....... ....... + | | | + --------------------------------------- + | + ConvergeGateway + | + ParallelGateway + | + --------------------------------------- + | | | + 500_host_init 500_host_init ....... + | | | + install_plugin install_plugin ....... + | | | render_config render_config ....... | | | ....... ....... ....... @@ -231,9 +253,12 @@ def create_pipeline( # if instance_id in subscription_instance_map: # action_instances[json.dumps(step_actions)].append(subscription_instance_map[instance_id]) + task_host_limit: int = task_host_limit_config["task_host_limit"] + paralle_gateway_process_limit: int = task_host_limit_config["paralle_gateway_process_limit"] + sub_processes = [] + global_pipeline_data = Data() - start_event = builder.EmptyStartEvent() for metadata_json_str, sub_insts in sub_insts_gby_metadata.items(): start = 0 metadata = json.loads(metadata_json_str) @@ -247,16 +272,46 @@ def create_pipeline( ) sub_processes.append(activities_start_event) start = start + task_host_limit - parallel_gw = builder.ParallelGateway() - converge_gw = builder.ConvergeGateway() + + start_event = builder.EmptyStartEvent() end_event = builder.EmptyEndEvent() - start_event.extend(parallel_gw).connect(*sub_processes).to(parallel_gw).converge(converge_gw).extend(end_event) + + if not is_multi_paralle_gateway: + # 保留原有逻辑,新逻辑按业务进行灰度,稳定后,此部分逻辑可以清理 + parallel_gw = builder.ParallelGateway() + converge_gw = builder.ConvergeGateway() + start_event.extend(parallel_gw).connect(*sub_processes).to(parallel_gw).converge(converge_gw).extend(end_event) + else: + sub_processes.reverse() + + parallel_gws = [] + converge_gws = [] + + for start in range(0, len(sub_processes), paralle_gateway_process_limit): + parallel_gw = builder.ParallelGateway() + converge_gw = builder.ConvergeGateway() + parallel_gw.connect(*sub_processes[start : start + paralle_gateway_process_limit]).to(parallel_gw).converge( + converge_gw + ) + + parallel_gws.append(parallel_gw) + converge_gws.append(converge_gw) + + for _converge_gw, _parallel_gw in zip(converge_gws[0:-1], parallel_gws[1:]): + _converge_gw.extend(_parallel_gw) + + start_event.extend(parallel_gws[0]) + converge_gws[-1].extend(end_event) # 构造pipeline树 tree = builder.build_tree(start_event, data=global_pipeline_data) models.PipelineTree.objects.create(id=tree["id"], tree=tree) - parser = PipelineParser(pipeline_tree=tree) + # 固定流程无需检查是否存在cycle, cycle_tolerate=True 跳过cycle检查可节省大部分时间 + if is_multi_paralle_gateway: + parser = PipelineParser(pipeline_tree=tree, cycle_tolerate=True) + else: + parser = PipelineParser(pipeline_tree=tree) pipeline = parser.parse() return pipeline @@ -319,9 +374,10 @@ def wrapper(subscription: models.Subscription, subscription_task: models.Subscri def create_task( subscription: models.Subscription, subscription_task: models.SubscriptionTask, - instances: Dict[str, Dict[str, Union[Dict, Any]]], + instances: Union[RedisDict, Dict[str, Dict]], instance_actions: Dict[str, Dict[str, str]], preview_only: bool = False, + data_backend: str = None, ): """ 创建执行任务 @@ -338,12 +394,12 @@ def create_task( :return: SubscriptionTask """ # 兜底注入 Meta,此处注入是覆盖面最全的(包含历史被移除实例) - GrayTools().inject_meta_to_instances(instances) + injected_instances: Union[RedisDict, dict] = GrayTools().inject_meta_to_instances(instances, data_backend) logger.info( "[sub_lifecycle][create_task] inject meta to instances[num=%s] successfully", subscription.id, subscription_task.id, - len(instances), + len(injected_instances), ) topo_order = CmdbHandler.get_topo_order() @@ -361,8 +417,9 @@ def create_task( # 批量创建订阅实例执行记录 to_be_created_records_map = {} plugin__host_id__bk_obj_sub_map = {} + instance_ids = injected_instances.keys() for instance_id, step_action in instance_actions.items(): - if instance_id not in instances: + if instance_id not in instance_ids: # instance_id不在instances中,则说明该实例可能已经不在该业务中,因此无法操作,故不处理。 continue @@ -373,7 +430,7 @@ def create_task( agent.InstallProxy.ACTION_NAME, agent.InstallProxy2.ACTION_NAME, ] - instance_info = instances[instance_id] + instance_info = injected_instances[instance_id] host_info = instance_info["host"] record = models.SubscriptionInstanceRecord( task_id=subscription_task.id, @@ -502,10 +559,17 @@ def create_task( amount=len(created_instance_records) ) - task_host_limit = models.GlobalSettings.get_config( - models.GlobalSettings.KeyEnum.TASK_HOST_LIMIT.value, default=TASK_HOST_LIMIT + task_host_limit_config: Dict[str, int] = models.GlobalSettings.get_config( + models.GlobalSettings.KeyEnum.TASK_HOST_LIMIT_CONFIG.value, + default={"task_host_limit": TASK_HOST_LIMIT, "paralle_gateway_process_limit": PARALLE_GATEWAY_PROCESS_LIMIT}, + ) + pipeline = create_pipeline( + subscription, + instance_actions, + created_instance_records, + task_host_limit_config, + subscription.is_multi_paralle_gateway, ) - pipeline = create_pipeline(subscription, instance_actions, created_instance_records, task_host_limit) # 保存pipeline id subscription_task.pipeline_id = pipeline.id subscription_task.save(update_fields=["actions", "pipeline_id"]) @@ -598,9 +662,19 @@ def run_subscription_task_and_create_instance( # 获取订阅范围内全部实例 steps = subscription.steps tolerance_time: int = (59, 0)[subscription.is_need_realtime()] - instances = tools.get_instances_by_scope_with_checker( - scope, steps, source="run_subscription_task_and_create_instance", tolerance_time=tolerance_time + + data_backend = ( + constants.DataBackend.REDIS.value if subscription.is_multi_paralle_gateway else constants.DataBackend.MEM.value + ) + + instances: Union[RedisDict, dict] = tools.get_instances_by_scope_with_checker( + scope, + steps, + source="run_subscription_task_and_create_instance", + tolerance_time=tolerance_time, + data_backend=data_backend, ) + logger.info( "[sub_lifecycle][run_subscription_task_and_create_instance] " "get_instances_by_scope_with_checker -> %s", @@ -624,34 +698,41 @@ def run_subscription_task_and_create_instance( if actions is not None: # 指定了动作,不需要计算,直接执行即可 - instance_actions = {instance_id: actions for instance_id in instances} + instance_actions: Union[RedisDict, dict] = DynamicContainer(data_backend=data_backend).container + for instance_id in instances.keys(): + instance_actions[instance_id] = actions create_task(subscription, subscription_task, instances, instance_actions) return # 预注入 Meta,用于变更计算(仅覆盖当前订阅范围,移除场景通过 create_task 兜底注入) - GrayTools().inject_meta_to_instances(instances) + injected_instances: Union[RedisDict, dict] = GrayTools().inject_meta_to_instances(instances, data_backend) logger.info( "[sub_lifecycle][run_subscription_task_and_create_instance] " "pre-inject meta to instances[num=%s] successfully", subscription.id, subscription_task.id, - len(instances), + len(injected_instances), ) # 按步骤顺序计算实例变更所需的动作 - instance_actions = defaultdict(dict) - instance_migrate_reasons = defaultdict(dict) + instance_actions: dict = {} + instance_migrate_reasons: Union[RedisDict, dict] = DynamicContainer(data_backend=data_backend).container is_unknown_migrate_type_exists: bool = False for step in step_managers.values(): # 计算变更的动作 migrate_results = step.make_instances_migrate_actions( - instances, auto_trigger=subscription_task.is_auto_trigger, preview_only=preview_only + injected_instances, auto_trigger=subscription_task.is_auto_trigger, preview_only=preview_only ) # 归类变更动作 # eg: {"host|instance|host|1": "MAIN_INSTALL_PLUGIN"} instance_id_action_map: Dict[str, str] = migrate_results["instance_actions"] + for instance_id, action in instance_id_action_map.items(): - instance_actions[instance_id][step.step_id] = action + instance_step_id_action = instance_actions.get(instance_id, {}) + instance_step_id_action[step.step_id] = action + instance_actions[instance_id] = instance_step_id_action + + # instance_actions[instance_id][step.step_id] = action metrics.app_task_instances_migrate_actions_total.labels(step_id=step.step_id, action=action).inc() # 归类变更原因 @@ -667,7 +748,10 @@ def run_subscription_task_and_create_instance( # } instance_id_action_reason_map: Dict[str, Dict] = migrate_results["migrate_reasons"] for instance_id, migrate_reason in instance_id_action_reason_map.items(): - instance_migrate_reasons[instance_id][step.step_id] = migrate_reason + instance_step_migrate_reasons = instance_migrate_reasons.get(instance_id, {}) + instance_step_migrate_reasons[step.step_id] = migrate_reason + instance_migrate_reasons[instance_id] = instance_step_migrate_reasons + # instance_migrate_reasons[instance_id][step.step_id] = migrate_reason if "migrate_type" not in migrate_reason: is_unknown_migrate_type_exists = True metrics.app_task_instances_migrate_reasons_total.labels( @@ -685,7 +769,9 @@ def run_subscription_task_and_create_instance( ) # 查询被从范围内移除的实例 - instance_not_in_scope = [instance_id for instance_id in instance_actions if instance_id not in instances] + instance_not_in_scope = [ + instance_id for instance_id in instance_actions if instance_id not in injected_instances.keys() + ] if instance_not_in_scope: deleted_id_not_in_scope = [] @@ -704,7 +790,7 @@ def run_subscription_task_and_create_instance( subscription_task.id, deleted_id_not_in_scope, ) - deleted_instance_info = tools.get_instances_by_scope_with_checker( + deleted_instance_info: Union[RedisDict, dict] = tools.get_instances_by_scope_with_checker( { "bk_biz_id": subscription.bk_biz_id, "object_type": subscription.object_type, @@ -713,10 +799,11 @@ def run_subscription_task_and_create_instance( }, steps, source="find_deleted_instances", + data_backend=data_backend, ) # 如果被删掉的实例在 CMDB 找不到,那么就使用最近一次的 InstanceRecord 的快照数据 - not_exist_instance_id = set(instance_not_in_scope) - set(deleted_instance_info) + not_exist_instance_id = set(instance_not_in_scope) - set(deleted_instance_info.keys()) latest_instance_ids = set() if not_exist_instance_id: records = list( @@ -765,10 +852,10 @@ def run_subscription_task_and_create_instance( len(not_exist_db_instance_id_set), ) - instances.update(deleted_instance_info) + injected_instances.update(deleted_instance_info) create_task_result = create_task( - subscription, subscription_task, instances, instance_actions, preview_only=preview_only + subscription, subscription_task, injected_instances, instance_actions, preview_only=preview_only ) return { @@ -776,7 +863,7 @@ def run_subscription_task_and_create_instance( "error_hosts": create_task_result["error_hosts"], "instance_actions": instance_actions, "instance_migrate_reasons": instance_migrate_reasons, - "instance_id__inst_info_map": instances, + "instance_id__inst_info_map": injected_instances, } diff --git a/apps/backend/subscription/tools.py b/apps/backend/subscription/tools.py index f40e687826..a67937b25e 100644 --- a/apps/backend/subscription/tools.py +++ b/apps/backend/subscription/tools.py @@ -50,6 +50,7 @@ from apps.utils.basic import chunk_lists, distinct_dict_list, order_dict from apps.utils.batch_request import batch_request, request_multi_thread from apps.utils.concurrent import batch_call +from apps.utils.redis import DynamicContainer, RedisDict, RedisList from apps.utils.time_handler import strftime_local from common.api import CCApi @@ -251,7 +252,7 @@ def create_host_key(data: Dict) -> str: @SetupObserve(counter=metrics.app_common_method_requests_total, get_labels_func=get_call_resource_labels_func) -def find_host_biz_relations(bk_host_ids: List[int]) -> List[Dict]: +def find_host_biz_relations(bk_host_ids: List[int], data_backend: str = None) -> List[Dict]: """ 查询主机所属拓扑关系 :param bk_host_ids: 主机ID列表 [1, 2, 3] @@ -268,15 +269,16 @@ def find_host_biz_relations(bk_host_ids: List[int]) -> List[Dict]: """ # 查询条件为空提前返回 if not bk_host_ids: - return [] + return DynamicContainer(return_type=constants.DCReturnType.LIST.value, data_backend=data_backend).container # CMDB 限制了单次查询数量,这里需分批并发请求查询 param_list = [ {"bk_host_id": bk_host_ids[count * constants.QUERY_CMDB_LIMIT : (count + 1) * constants.QUERY_CMDB_LIMIT]} for count in range(math.ceil(len(bk_host_ids) / constants.QUERY_CMDB_LIMIT)) ] - host_biz_relations = request_multi_thread(client_v2.cc.find_host_biz_relations, param_list, get_data=lambda x: x) - return host_biz_relations + return request_multi_thread( + client_v2.cc.find_host_biz_relations, param_list, get_data=lambda x: x, data_backend=data_backend + ) @controller.ConcurrentController( @@ -286,7 +288,11 @@ def find_host_biz_relations(bk_host_ids: List[int]) -> List[Dict]: get_config_dict_kwargs={"config_name": core.ServiceCCConfigName.CMDB_QUERY.value}, ) def get_service_instances( - bk_biz_id: int, filter_id_list: List[int], filter_field_name: FilterFieldName, ignore_exception: bool = True + bk_biz_id: int, + filter_id_list: List[int], + filter_field_name: FilterFieldName, + ignore_exception: bool = True, + data_backend: str = None, ) -> List[Dict]: """ 分批查询业务主机进程 @@ -308,12 +314,13 @@ def get_service_instances( "no_request": True, } if filter_field_name.needs_batch_request: - result = batch_request( + result: Union[RedisList, List] = batch_request( CCApi.list_service_instance_detail, params, sort="id", limit=constants.LIST_SERVICE_INSTANCE_DETAIL_LIMIT, interval=constants.LIST_SERVICE_INSTANCE_DETAIL_INTERVAL, + data_backend=data_backend, ) else: params["page"] = { @@ -382,7 +389,7 @@ def get_modules_by_inst_list(inst_list, module_to_topo): return module_ids, no_module_inst_list -def get_service_instance_by_inst(bk_biz_id, inst_list, module_to_topo): +def get_service_instance_by_inst(bk_biz_id, inst_list, module_to_topo, data_backend: str = None): module_ids, no_module_inst_list = get_modules_by_inst_list(inst_list, module_to_topo) if not module_ids: return [] @@ -402,28 +409,36 @@ def get_service_instance_by_inst(bk_biz_id, inst_list, module_to_topo): }, "sort": "id", "limit": constants.LIST_SERVICE_INSTANCE_DETAIL_LIMIT, + "data_backend": data_backend, } for bk_module_id in module_ids ] - service_instances = batch_call( - batch_request, params, extend_result=True, interval=constants.LIST_SERVICE_INSTANCE_DETAIL_INTERVAL + service_instances: Union[RedisList, list] = batch_call( + batch_request, + params, + extend_result=True, + interval=constants.LIST_SERVICE_INSTANCE_DETAIL_INTERVAL, + data_backend=data_backend, ) else: params = {"bk_biz_id": int(bk_biz_id), "with_name": True, "no_request": True} - service_instances = batch_request( + service_instances: Union[RedisList, list] = batch_request( CCApi.list_service_instance_detail, params, sort="id", limit=constants.LIST_SERVICE_INSTANCE_DETAIL_LIMIT, interval=constants.LIST_SERVICE_INSTANCE_DETAIL_INTERVAL, + data_backend=data_backend, ) - service_instances = [ - service_instance for service_instance in service_instances if service_instance["bk_module_id"] in module_ids - ] - - return service_instances + redis_service_instances: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container + for service_instance in service_instances: + if service_instance["bk_module_id"] in module_ids: + redis_service_instances.append(service_instance) + return redis_service_instances @FuncCacheDecorator(cache_time=1 * constants.TimeUnit.MINUTE) @@ -448,7 +463,7 @@ def fetch_biz_info(bk_biz_ids: typing.List[int]) -> typing.Dict[int, typing.Dict return {bk_biz_id: biz_info_map.get(str(bk_biz_id)) or {} for bk_biz_id in bk_biz_ids} -def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id: int = None): +def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id: int = None, data_backend: str = None): """ 根据集群模板ID/服务模板ID获得主机的详细信息 :param bk_obj_id: 模板类型 @@ -465,15 +480,17 @@ def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id: # 服务模板 call_func = client_v2.cc.find_host_by_service_template template_ids = [info["bk_inst_id"] for info in template_info_list] - host_info_result = batch_request( - call_func, dict(bk_service_template_ids=template_ids, bk_biz_id=bk_biz_id, fields=fields) + host_info_result: Union[RedisList, list] = batch_request( + call_func, + dict(bk_service_template_ids=template_ids, bk_biz_id=bk_biz_id, fields=fields, data_backend=data_backend), ) else: # 集群模板 call_func = client_v2.cc.find_host_by_set_template template_ids = [info["bk_inst_id"] for info in template_info_list] - host_info_result = batch_request( - call_func, dict(bk_set_template_ids=template_ids, bk_biz_id=bk_biz_id, fields=fields) + host_info_result: Union[RedisList, list] = batch_request( + call_func, + dict(bk_set_template_ids=template_ids, bk_biz_id=bk_biz_id, fields=fields, data_backend=data_backend), ) biz_info = fetch_biz_info([bk_biz_id]) cloud_id_name_map = models.Cloud.cloud_id_name_map(get_cache=True) @@ -481,12 +498,18 @@ def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id: if not biz_info[bk_biz_id]: logger.warning("[get_host_detail_by_template] can not find biz_info -> %s", bk_biz_id) + is_redis_data_backend: bool = data_backend == constants.DataBackend.REDIS.value + if is_redis_data_backend: + redis_host_info_result: RedisList = RedisList() + for host in host_info_result: host["bk_biz_id"] = bk_biz_id host["bk_biz_name"] = host["bk_biz_name"] = biz_info[bk_biz_id].get("bk_biz_name") host["bk_cloud_name"] = cloud_id_name_map.get(str(host["bk_cloud_id"])) + if is_redis_data_backend: + redis_host_info_result.append(host) - return host_info_result + return redis_host_info_result if is_redis_data_backend else host_info_result def get_service_instances_by_template(bk_obj_id, template_info_list: list, bk_biz_id: int = None): @@ -527,7 +550,7 @@ def get_service_instances_by_template(bk_obj_id, template_info_list: list, bk_bi @SetupObserve(counter=metrics.app_common_method_requests_total, get_labels_func=get_call_resource_labels_func) -def get_host_detail(host_info_list: list, bk_biz_id: int = None): +def get_host_detail(host_info_list: list, bk_biz_id: int = None, data_backend: str = None): """ 获取主机详情 :param bk_biz_id: 业务ID @@ -544,10 +567,11 @@ def get_host_detail(host_info_list: list, bk_biz_id: int = None): }] :return: list 主机详细信息 """ + host_details: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container if not host_info_list: - return [] - - host_details = [] + return host_details # 仅支持一种主机格式 first_host_info = host_info_list[0] @@ -613,15 +637,22 @@ def get_host_detail(host_info_list: list, bk_biz_id: int = None): # 2. 无有效 ip 在后续执行 create_host_key 获取 bk_cloud_id 也会 KeyError # 3. 综上所述,提前返回可以减少无效执行逻辑及网络IO return [] - - hosts = list_biz_hosts(bk_biz_id, cond, "list_hosts_without_biz", source="get_host_detail:list_hosts_without_biz") + hosts: Union[RedisList, list] = list_biz_hosts( + bk_biz_id, + cond, + "list_hosts_without_biz", + source="get_host_detail:list_hosts_without_biz", + data_backend=data_backend, + ) bk_host_ids = [] bk_cloud_ids = [] for host in hosts: bk_host_ids.append(host["bk_host_id"]) bk_cloud_ids.append(host["bk_cloud_id"]) - host_relations = find_host_biz_relations(list(set(bk_host_ids)), source="get_host_detail") + host_relations: Union[RedisList, list] = find_host_biz_relations( + list(set(bk_host_ids)), source="get_host_detail", data_backend=data_backend + ) host_biz_map = {} for host in host_relations: host_biz_map[host["bk_host_id"]] = host["bk_biz_id"] @@ -632,8 +663,12 @@ def get_host_detail(host_info_list: list, bk_biz_id: int = None): all_biz_ids = list(set(host_biz_map.values()) - {settings.BK_CMDB_RESOURCE_POOL_BIZ_ID}) all_biz_info = fetch_biz_info(all_biz_ids) - host_key_dict = {} - host_id_dict = {} + host_key_dict: Union[RedisDict, dict] = DynamicContainer( + return_type=constants.DCReturnType.DICT.value, data_backend=data_backend + ).container + host_id_dict: Union[RedisDict, dict] = DynamicContainer( + return_type=constants.DCReturnType.DICT.value, data_backend=data_backend + ).container for _host in hosts: _host["bk_biz_id"] = host_biz_map[_host["bk_host_id"]] _host["bk_biz_name"] = ( @@ -652,23 +687,23 @@ def get_host_detail(host_info_list: list, bk_biz_id: int = None): ip = _host.get("bk_host_innerip") or _host.get("ip") or _host.get("bk_host_innerip_v6") host_key = f'{ip}-{_host["bk_cloud_id"]}-{constants.DEFAULT_SUPPLIER_ID}' host_key_dict[host_key] = _host - host_id_dict[_host["bk_host_id"]] = _host + host_id_dict[str(_host["bk_host_id"])] = _host for host_info in host_info_list: if "bk_host_id" in host_info: - if host_info["bk_host_id"] in host_id_dict: - host_info.update(host_id_dict[host_info["bk_host_id"]]) + if str(host_info["bk_host_id"]) in host_id_dict.keys(): + host_info.update(host_id_dict[str(host_info["bk_host_id"])]) host_details.append(host_info) else: host_key = create_host_key(host_info) - if host_key in host_key_dict: + if host_key in host_key_dict.keys(): host_info.update(host_key_dict[host_key]) host_details.append(host_info) return host_details -def add_host_module_info(host_biz_relations, instances): +def add_host_module_info(host_biz_relations, instances, data_backend: str = None): """ 增加主机的模块信息(为降低圈复杂度所写) :param host_biz_relations: 主机的业务相关信息 @@ -683,10 +718,18 @@ def add_host_module_info(host_biz_relations, instances): else: bk_host_module_map_id[relation["bk_host_id"]].append(relation["bk_module_id"]) + is_reids_data_backend: bool = data_backend == constants.DataBackend.REDIS.value + if is_reids_data_backend: + redis_instances: RedisList = RedisList() + for instance in instances: if "module" not in instance["host"]: instance["host"]["module"] = bk_host_module_map_id.get(instance["host"]["bk_host_id"]) - return instances + + if is_reids_data_backend: + redis_instances.append(instance) + + return redis_instances if is_reids_data_backend else instances def check_instances_object_type(nodes): @@ -729,11 +772,15 @@ def set_template_scope_nodes(scope): return scope["nodes"] -def get_host_relation(bk_biz_id, nodes): - data = [] - hosts = get_host_by_inst(bk_biz_id, nodes) +def get_host_relation(bk_biz_id, nodes, data_backend: str): + data: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container + hosts: Union[RedisList, list] = get_host_by_inst(bk_biz_id, nodes, data_backend) - host_biz_relations = find_host_biz_relations([_host["bk_host_id"] for _host in hosts], source="get_host_relation") + host_biz_relations: Union[RedisList, list] = find_host_biz_relations( + [_host["bk_host_id"] for _host in hosts], source="get_host_relation", data_backend=data_backend + ) relations = defaultdict(lambda: defaultdict(list)) for item in host_biz_relations: @@ -769,7 +816,9 @@ def support_multi_biz(get_instances_by_scope_func): """支持scope多范围""" @wraps(get_instances_by_scope_func) - def wrapper(scope: Dict[str, Union[Dict, Any]], *args, **kwargs) -> Dict[str, Dict[str, Union[Dict, Any]]]: + def wrapper( + scope: Dict[str, Union[Dict, Any]], data_backend: str, *args, **kwargs + ) -> Dict[str, Dict[str, Union[Dict, Any]]]: nodes: typing.List[typing.Dict[str, typing.Union[str, int]]] = scope["nodes"] fill_nodes_biz_info(nodes=nodes) # 兼容只传bk_host_id的情况 @@ -778,12 +827,13 @@ def wrapper(scope: Dict[str, Union[Dict, Any]], *args, **kwargs) -> Dict[str, Di and scope["node_type"] == models.Subscription.NodeType.INSTANCE ): if None in [node.get("bk_biz_id") for node in scope["nodes"]]: - return get_instances_by_scope_func(scope, **kwargs) + return get_instances_by_scope_func(scope, data_backend, **kwargs) - instance_id_info_map = {} + instance_id_info_map: typing.Union[RedisDict, dict] = DynamicContainer(data_backend=data_backend).container nodes = sorted(scope["nodes"], key=lambda node: node.get("bk_biz_id") or scope.get("bk_biz_id")) params_list = [ { + "data_backend": data_backend, "scope": { "bk_biz_id": bk_biz_id, "object_type": scope["object_type"], @@ -796,7 +846,9 @@ def wrapper(scope: Dict[str, Union[Dict, Any]], *args, **kwargs) -> Dict[str, Di } for bk_biz_id, nodes in groupby(nodes, key=lambda x: x.get("bk_biz_id") or scope.get("bk_biz_id")) ] - results = request_multi_thread(get_instances_by_scope_func, params_list, get_data=lambda x: [x]) + results = request_multi_thread( + get_instances_by_scope_func, params_list, get_data=lambda x: [x], data_backend=data_backend + ) for result in results: instance_id_info_map.update(result) return instance_id_info_map @@ -824,7 +876,11 @@ def get_scope_labels_func( def get_instances_by_scope_with_checker( - scope: Dict[str, Union[Dict, int, Any]], steps: List[models.SubscriptionStep], *args, **kwargs + scope: Dict[str, Union[Dict, int, Any]], + steps: List[models.SubscriptionStep], + data_backend: str = constants.DataBackend.MEM.value, + *args, + **kwargs, ) -> Dict[str, Dict[str, Union[Dict, Any]]]: if "with_info" in scope: @@ -838,13 +894,15 @@ def get_instances_by_scope_with_checker( scope["with_info"]["process"] = True break - return get_instances_by_scope(scope, *args, **kwargs) + return get_instances_by_scope(scope, data_backend, *args, **kwargs) @support_multi_biz @SetupObserve(histogram=metrics.app_task_get_instances_by_scope_duration_seconds, get_labels_func=get_scope_labels_func) -@FuncCacheDecorator(cache_time=SUBSCRIPTION_SCOPE_CACHE_TIME) -def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str, Dict[str, Union[Dict, Any]]]: +@FuncCacheDecorator(cache_time=SUBSCRIPTION_SCOPE_CACHE_TIME, uuid_cache_enable=True) +def get_instances_by_scope( + scope: Dict[str, Union[Dict, int, Any]], data_backend: str +) -> Dict[str, Dict[str, Union[Dict, Any]]]: """ 获取范围内的所有主机 :param scope: dict { @@ -881,7 +939,6 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str, if instance_selector == []: return {} - instances = [] bk_biz_id = scope["bk_biz_id"] if bk_biz_id: module_to_topo = get_module_to_topo_dict(bk_biz_id) @@ -894,25 +951,30 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str, return {} need_register = scope.get("need_register", False) + + instances = DynamicContainer(return_type=constants.DCReturnType.LIST.value, data_backend=data_backend).container # 按照拓扑查询 if scope["node_type"] == models.Subscription.NodeType.TOPO: if scope["object_type"] == models.Subscription.ObjectType.HOST: - instances.extend([{"host": inst} for inst in get_host_relation(bk_biz_id, nodes)]) + instances.extend([{"host": inst} for inst in get_host_relation(bk_biz_id, nodes, data_backend)]) else: # 补充服务实例中的信息 instances.extend( - [{"service": inst} for inst in get_service_instance_by_inst(bk_biz_id, nodes, module_to_topo)] + [ + {"service": inst} + for inst in get_service_instance_by_inst( + bk_biz_id, nodes, module_to_topo, data_backend=data_backend + ) + ] ) # 按照实例查询 elif scope["node_type"] == models.Subscription.NodeType.INSTANCE: if scope["object_type"] == models.Subscription.ObjectType.HOST: - instances.extend( - [ - {"host": inst} - for inst in get_host_detail(nodes, bk_biz_id=bk_biz_id, source="get_instances_by_scope") - ] + host_detail: Union[RedisList, list] = get_host_detail( + nodes, bk_biz_id=bk_biz_id, source="get_instances_by_scope", data_backend=data_backend ) + instances.extend([{"host": inst} for inst in host_detail]) else: service_instance_ids = [int(node["id"]) for node in nodes] instances.extend( @@ -923,6 +985,7 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str, filter_id_list=service_instance_ids, filter_field_name=FilterFieldName.SERVICE_INSTANCE_IDS, ignore_exception=False, + data_backend=data_backend, ) ] ) @@ -936,11 +999,15 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str, bk_obj_id_set = check_instances_object_type(nodes) if scope["object_type"] == models.Subscription.ObjectType.HOST: # 补充实例所属模块ID - host_biz_relations = [] + host_biz_relations: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container instances.extend( [ {"host": inst} - for inst in get_host_detail_by_template(list(bk_obj_id_set)[0], nodes, bk_biz_id=bk_biz_id) + for inst in get_host_detail_by_template( + list(bk_obj_id_set)[0], nodes, bk_biz_id=bk_biz_id, data_backend=data_backend + ) ] ) bk_host_id_chunks = chunk_lists([instance["host"]["bk_host_id"] for instance in instances], 500) @@ -954,26 +1021,36 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str, # 转化模板为节点 nodes = set_template_scope_nodes(scope) - instances = add_host_module_info(host_biz_relations, instances) + instances: Union[RedisList, list] = add_host_module_info( + host_biz_relations, instances, data_backend=data_backend + ) else: # 补充服务实例中的信息 # 转化模板为节点,**注意不可在get_service_instance_by_inst之后才转换** nodes = set_template_scope_nodes(scope) instances.extend( - [{"service": inst} for inst in get_service_instance_by_inst(bk_biz_id, nodes, module_to_topo)] + [ + {"service": inst} + for inst in get_service_instance_by_inst( + bk_biz_id, nodes, module_to_topo, data_backend=data_backend + ) + ] ) if not need_register: # 补充必要的主机或实例相关信息 - - add_host_info_to_instances(bk_biz_id, scope, instances) - add_scope_info_to_instances(nodes, scope, instances, module_to_topo) + instances: Union[RedisList, list] = add_host_info_to_instances(bk_biz_id, scope, instances, data_backend) + instances: Union[RedisList, list] = add_scope_info_to_instances( + nodes, scope, instances, module_to_topo, data_backend + ) if scope["with_info"]["process"]: - add_process_info_to_instances(bk_biz_id, scope, instances) + instances: Union[RedisList, list] = add_process_info_to_instances(bk_biz_id, scope, instances, data_backend) - instances_dict = {} + instances_dict: typing.Union[RedisDict, dict] = DynamicContainer( + data_backend=data_backend, cache_time=SUBSCRIPTION_SCOPE_CACHE_TIME + ).container data = { "object_type": scope["object_type"], "node_type": models.Subscription.NodeType.INSTANCE, @@ -997,7 +1074,7 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str, return_all_node_type=True, ).values_list("bk_host_id", flat=True) - selector_instances_dict = {} + selector_instances_dict: typing.Union[RedisDict, dict] = DynamicContainer(data_backend=data_backend).container for node_id, instance in instances_dict.items(): is_host = data["object_type"] == models.Subscription.ObjectType.HOST instance_data = instance["host"] if is_host else instance["service"] @@ -1010,28 +1087,44 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str, return instances_dict -def add_host_info_to_instances(bk_biz_id: int, scope: Dict, instances: Dict): +def add_host_info_to_instances(bk_biz_id: int, scope: Dict, instances: RedisList, data_backend: str): """ 补全实例的主机信息 :param bk_biz_id: 业务ID :param scope: 目标范围 :param instances: 实例列表 """ + is_redis_data_backend = data_backend == constants.DataBackend.REDIS.value + + redis_instances: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container if scope["object_type"] != models.Subscription.ObjectType.SERVICE: # 补充缺省字段,兜底 cmdb_instance.service 的配置定义场景 for instance in instances: instance["service"] = instance.get("service") + if is_redis_data_backend: + redis_instances.append(instance) # 非服务实例,不需要补充实例主机信息 - return + return redis_instances if is_redis_data_backend else instances + + host_dict = DynamicContainer(data_backend=data_backend).container + + hosts_detail: Union[RedisList, list] = get_host_detail( + [instance["service"] for instance in instances], + bk_biz_id=bk_biz_id, + source="add_host_info_to_instances", + data_backend=data_backend, + ) + for host_info in hosts_detail: + host_dict[host_info["bk_host_id"]] = host_info - host_dict = { - host_info["bk_host_id"]: host_info - for host_info in get_host_detail( - [instance["service"] for instance in instances], bk_biz_id=bk_biz_id, source="add_host_info_to_instances" - ) - } for instance in instances: instance["host"] = host_dict[instance["service"]["bk_host_id"]] + if is_redis_data_backend: + redis_instances.append(instance) + + return redis_instances if is_redis_data_backend else instances def _add_scope_info_to_inst_instances(scope: Dict, instance: Dict): @@ -1077,7 +1170,9 @@ def _add_scope_info_to_topo_instances(scope: Dict, instance: Dict, nodes: List[D instance["scope"] = instance_scope -def add_scope_info_to_instances(nodes: List, scope: Dict, instances: List[Dict], module_to_topo: Dict[str, List]): +def add_scope_info_to_instances( + nodes: List, scope: Dict, instances: Union[RedisList, list], module_to_topo: Dict[str, List], data_backend: str +): """ 给实例添加目标范围信息 :param nodes: 节点列表 @@ -1086,20 +1181,34 @@ def add_scope_info_to_instances(nodes: List, scope: Dict, instances: List[Dict], :param module_to_topo: 模块拓扑 {"module|1": ["biz|2", "set|3", "module|1"]} :return: """ + is_redis_data_backend = data_backend == constants.DataBackend.REDIS.value + redis_instances: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container for instance in instances: if scope["node_type"] == models.Subscription.NodeType.INSTANCE: _add_scope_info_to_inst_instances(scope, instance) else: _add_scope_info_to_topo_instances(scope, instance, nodes, module_to_topo) + if is_redis_data_backend: + redis_instances.append(instance) + + return redis_instances if is_redis_data_backend else instances -def _add_process_info_to_host_instances(bk_biz_id: int, instances: List[Dict]): + +def _add_process_info_to_host_instances(bk_biz_id: int, instances: Union[RedisList, list], data_backend: str): """ 给主机实例添加进程信息 :param bk_biz_id: 业务ID :param instances: 实例列表 """ + is_redis_data_backend = data_backend == constants.DataBackend.REDIS.value + redis_instances: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container bk_host_list = [instance["host"]["bk_host_id"] for instance in instances] + # TODO REDIS_DICT host_processes = get_process_by_biz_id(bk_biz_id, bk_host_list) logger.info( f"[add_process_info_to_host_instances] instance_hosts_count -> {len(bk_host_list)}, " @@ -1108,22 +1217,34 @@ def _add_process_info_to_host_instances(bk_biz_id: int, instances: List[Dict]): for instance in instances: bk_host_id = instance["host"]["bk_host_id"] instance["process"] = host_processes[bk_host_id] + if is_redis_data_backend: + redis_instances.append(instance) + + return redis_instances if is_redis_data_backend else instances -def _add_process_info_to_service_instances(instances: List[Dict]): +def _add_process_info_to_service_instances(instances: List[Dict], data_backend: str = None): """ 给服务实例添加进程信息 :param instances: 实例列表 """ + is_redis_data_backend = data_backend == constants.DataBackend.REDIS.value + redis_instances: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container for instance in instances: processes = {} for process in instance["service"].get("process_instances") or []: processes[process["process"]["bk_process_name"]] = process["process"] instance["process"] = processes del instance["service"]["process_instances"] + if is_redis_data_backend: + redis_instances.append(instance) + + return redis_instances if is_redis_data_backend else instances -def add_process_info_to_instances(bk_biz_id: int, scope, instances): +def add_process_info_to_instances(bk_biz_id: int, scope, instances: RedisList, data_backend: str): """ 给实例列表添加进程信息 :param bk_biz_id: 业务 @@ -1131,9 +1252,9 @@ def add_process_info_to_instances(bk_biz_id: int, scope, instances): :param instances: 实例列表 """ if scope["object_type"] == models.Subscription.ObjectType.HOST: - _add_process_info_to_host_instances(bk_biz_id, instances) + return _add_process_info_to_host_instances(bk_biz_id, instances, data_backend) else: - _add_process_info_to_service_instances(instances) + return _add_process_info_to_service_instances(instances, data_backend) def get_plugin_path(plugin_name: str, target_host: models.Host, agent_config: Dict) -> Dict: diff --git a/apps/backend/tests/subscription/test_performance.py b/apps/backend/tests/subscription/test_performance.py index 67ffbc442a..e17ee8ebc6 100644 --- a/apps/backend/tests/subscription/test_performance.py +++ b/apps/backend/tests/subscription/test_performance.py @@ -215,7 +215,7 @@ class TestPerformance(TestCase): @classmethod def mock_get_instance_by_scope(cls, instance_num): - def get_instances_by_scope(scope, *args, **kwargs): + def get_instances_by_scope(scope, data_backend, *args, **kwargs): if scope["nodes"]: mocked_instances = {} for i in range(instance_num): diff --git a/apps/core/concurrent/cache.py b/apps/core/concurrent/cache.py index af3c85955d..b56123083c 100644 --- a/apps/core/concurrent/cache.py +++ b/apps/core/concurrent/cache.py @@ -10,6 +10,7 @@ """ import typing +import uuid import ujson as json import wrapt @@ -19,6 +20,7 @@ from apps.prometheus import metrics from apps.prometheus.helper import observe from apps.utils.cache import format_cache_key +from apps.utils.redis import RedisDict, RedisList from env.constants import CacheBackend DEFAULT_CACHE_TIME = 60 * 15 @@ -28,11 +30,12 @@ class FuncCacheDecorator: cache_time: int = DEFAULT_CACHE_TIME - def __init__(self, cache_time: typing.Optional[int] = None): + def __init__(self, cache_time: typing.Optional[int] = None, uuid_cache_enable: bool = False): """ :param cache_time: 缓存事件(秒) """ self.cache_time = cache_time or DEFAULT_CACHE_TIME + self.uuid_cache_enable = uuid_cache_enable def get_from_cache(self, using: str, key: str) -> typing.Any: cache = caches[using] @@ -42,12 +45,37 @@ def get_from_cache(self, using: str, key: str) -> typing.Any: if using == CacheBackend.DB.value: return json.loads(func_result) + + if self.uuid_cache_enable: + if isinstance(func_result, str) and func_result.startswith("data_backend_redis"): + data_type = func_result.split("_")[-2] + if data_type == "dict": + return RedisDict(cache_uuid_key=func_result) + elif data_type == "list": + return RedisList(cache_uuid_key=func_result) + return func_result def set_to_cache(self, using: str, key: str, value: typing.Any): cache = caches[using] if using == CacheBackend.DB.value: value = json.dumps(value) + + if self.uuid_cache_enable: + # 变量上下文会将原本的uuid_key删除,这里重新命名,即可达到缓存目的 + new_uuid_key = None + if isinstance(value, RedisDict): + new_uuid_key = f"data_backend_redis_dict_{uuid.uuid4().hex}" + elif isinstance(value, RedisList): + new_uuid_key = f"data_backend_redis_list_{uuid.uuid4().hex}" + + if new_uuid_key: + if value.client.exists(value.uuid_key): + value._update_redis_expiry(self.cache_time) + value.client.rename(value.uuid_key, new_uuid_key) + value.cache_uuid_key = new_uuid_key + value = new_uuid_key + cache.set(key, value, self.cache_time) def ttl_from_cache(self, using: str, key: str) -> int: @@ -76,7 +104,6 @@ def __call__( :param kwargs: 关键字参数 :return: """ - func_result: typing.Any = None func_name: str = wrapped.__name__ use_fast_cache: bool = False diff --git a/apps/core/gray/tools.py b/apps/core/gray/tools.py index 095cd29257..5ebee647ea 100644 --- a/apps/core/gray/tools.py +++ b/apps/core/gray/tools.py @@ -16,6 +16,7 @@ from apps.exceptions import ApiError from apps.node_man import constants as node_man_constants from apps.node_man import models as node_man_models +from apps.utils.redis import DynamicContainer, RedisDict from env.constants import GseVersion @@ -67,13 +68,14 @@ def get_host_ap_gse_version(self, bk_biz_id: typing.Any, ap_id: int, is_install_ return gse_version def inject_meta_to_instances( - self, instances: typing.Dict[str, typing.Dict[str, typing.Union[typing.Dict, typing.Any]]] + self, instances: typing.Dict[str, typing.Dict[str, typing.Union[typing.Dict, typing.Any]]], data_backend: str ): """ 在 instances 中注入 Meta 信息 :param instances: :return: """ + injected_instances: typing.Union[RedisDict, dict] = DynamicContainer(data_backend=data_backend).container bk_host_ids: typing.Set[int] = { instance_info["host"]["bk_host_id"] for instance_info in instances.values() @@ -111,6 +113,10 @@ def inject_meta_to_instances( meta["GSE_VERSION"] = gse_version instance_info["meta"] = meta + injected_instances[instance_id] = instance_info + + return injected_instances + @classmethod def get_gray_ap_map(cls) -> typing.Dict[int, int]: # 获取GSE2.0灰度 接入点映射关系 diff --git a/apps/node_man/constants.py b/apps/node_man/constants.py index cde490d4ae..fb161d278f 100644 --- a/apps/node_man/constants.py +++ b/apps/node_man/constants.py @@ -1234,3 +1234,13 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]: @classmethod def cpu_type__os_bit_map(cls): return {CpuType.x86: cls.BIT32.value, CpuType.x86_64: cls.BIT64.value, CpuType.aarch64: cls.ARM.value} + + +class DataBackend(EnhanceEnum): + MEM = "MEM" + REDIS = "REDIS" + + +class DCReturnType(EnhanceEnum): + DICT = "DICT" + LIST = "LIST" diff --git a/apps/node_man/handlers/policy.py b/apps/node_man/handlers/policy.py index 73217e6353..cbd1a74b3b 100644 --- a/apps/node_man/handlers/policy.py +++ b/apps/node_man/handlers/policy.py @@ -36,6 +36,7 @@ from apps.utils import concurrent from apps.utils.basic import distinct_dict_list from apps.utils.local import get_request_username +from apps.utils.redis import RedisDict from common.api import NodeApi logger = logging.getLogger("app") @@ -136,9 +137,9 @@ def search_deploy_policy(query_params: Dict[str, Any]) -> Dict[str, Any]: all_policy_ids = [policy["id"] for policy in all_policies] # 查询每个策略下最新的任务 - sub_task_infos = models.SubscriptionTask.objects.filter( - subscription_id__in=all_policy_ids - ).values("id", "subscription_id") + sub_task_infos = models.SubscriptionTask.objects.filter(subscription_id__in=all_policy_ids).values( + "id", "subscription_id" + ) max_sub_task_id_list, task_ids_gby_sub_id = [], defaultdict(list) for sub_task_dict in sub_task_infos: @@ -362,7 +363,7 @@ def selected_preview(query_params: Dict[str, Any]) -> Dict[str, Any]: @staticmethod def get_host_id__plugin_version_map( - step_objs: List[models.SubscriptionStep], instance_id__inst_info_map: Dict[str, Dict] + step_objs: List[models.SubscriptionStep], instance_id__inst_info_map: RedisDict ) -> Dict[int, str]: project = None for step_obj in step_objs: @@ -478,7 +479,7 @@ def migrate_preview(cls, query_params: Dict[str, Any]) -> List[Dict[str, Any]]: action_instance_map = defaultdict(list) instance_actions = preview_result["instance_actions"] instance_migrate_reasons = preview_result["instance_migrate_reasons"] - instance_id__inst_info_map = preview_result["instance_id__inst_info_map"] + instance_id__inst_info_map: Union[RedisDict, dict] = preview_result["instance_id__inst_info_map"] host_id__plugin_version_map = cls.get_host_id__plugin_version_map(step_objs, instance_id__inst_info_map) for instance_id, instance_record in preview_result["to_be_created_records_map"].items(): host_info = instance_record.instance_info["host"] diff --git a/apps/node_man/models.py b/apps/node_man/models.py index 1eb2c3d1cf..4c9d6a1de2 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -42,6 +42,7 @@ from django_mysql.models import JSONField from jinja2 import Template +from apps.backend.constants import InstNodeType from apps.backend.subscription.errors import PipelineExecuteFailed, SubscriptionNotExist from apps.backend.subscription.render_functions import get_hosts_by_node from apps.backend.utils.data_renderer import nested_render_data @@ -181,6 +182,10 @@ class KeyEnum(Enum): QUERY_PROC_STATUS_HOST_LENS = "QUERY_PROC_STATUS_HOST_LENS" # 业务最大插件版本 PLUGIN_VERSION_CONFIG = "PLUGIN_VERSION_CONFIG" + # pipeline 主机数量配置 + TASK_HOST_LIMIT_CONFIG = "TASK_HOST_LIMIT_CONFIG" + # pipeline 主机数量配置 + SUBSCRIPTION_DATA_REDIS_BIZ_LIST = "SUBSCRIPTION_DATA_REDIS_BIZ_LIST" key = models.CharField(_("键"), max_length=255, db_index=True, primary_key=True) v_json = JSONField(_("值")) @@ -2174,6 +2179,34 @@ def _construct_return_data( ) return _construct_return_data(False, sub_inst_bk_obj_id, _ordered_bk_obj_subs=ordered_bk_obj_subs) + @property + def is_multi_paralle_gateway(self) -> bool: + cache_key = f"{self.id}_is_multi_paralle_gateway" + if cache.get(cache_key): + return cache.get(cache_key) + + # bk_biz_ids: List[int] = list( + # Host.objects.filter(bk_host_id__in=bk_host_ids).values_list("bk_biz_id", flat=True)) + bk_biz_ids = set() + if self.bk_biz_id: + bk_biz_ids.add(self.bk_biz_id) + + for node in self.nodes: + if node.get("bk_biz_id") is not None: + bk_biz_ids.add(node["bk_biz_id"]) + + if node.get("instance_info", {}).get("bk_biz_id") is not None: + bk_biz_ids.add(node["instance_info"]["bk_biz_id"]) + + if node.get("bk_obj_id") == InstNodeType.BIZ and node.get("bk_inst_id") is not None: + bk_biz_ids.add(node["bk_inst_id"]) + redis_biz_list: List[int] = GlobalSettings.get_config( + GlobalSettings.KeyEnum.SUBSCRIPTION_DATA_REDIS_BIZ_LIST.value, default=[] + ) + is_multi_paralle_gateway: bool = bool(set(bk_biz_ids) & set(redis_biz_list)) + cache.set(cache_key, is_multi_paralle_gateway) + return is_multi_paralle_gateway + class Meta: verbose_name = _("订阅(Subscription)") verbose_name_plural = _("订阅(Subscription)") @@ -2434,7 +2467,7 @@ class PipelineTree(models.Model): def run(self, priority=None): # 根据流程描述结构创建流程对象 - parser = PipelineParser(pipeline_tree=self.tree) + parser = PipelineParser(pipeline_tree=self.tree, cycle_tolerate=True) pipeline = parser.parse() if priority is not None: action_result = task_service.run_pipeline(pipeline, priority=priority) diff --git a/apps/node_man/tests/test_handlers/test_policy.py b/apps/node_man/tests/test_handlers/test_policy.py index 3b4cc443c1..67be9d04f3 100644 --- a/apps/node_man/tests/test_handlers/test_policy.py +++ b/apps/node_man/tests/test_handlers/test_policy.py @@ -35,7 +35,7 @@ from apps.utils.unittest.testcase import CustomBaseTestCase -def get_instances_by_scope(scope, **kwargs): +def get_instances_by_scope(scope, data_backend, **kwargs): host_id = scope["nodes"][0]["bk_host_id"] host = Host.objects.filter(bk_host_id=host_id) instance_key = f"host|instance|host|{host_id}" diff --git a/apps/utils/batch_request.py b/apps/utils/batch_request.py index a25a1a5e54..571b0fba53 100644 --- a/apps/utils/batch_request.py +++ b/apps/utils/batch_request.py @@ -12,6 +12,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from copy import deepcopy from multiprocessing.pool import ThreadPool +from typing import Union from django.conf import settings from django.utils.translation import get_language @@ -19,6 +20,7 @@ from apps.exceptions import AppBaseException from apps.node_man import constants from apps.utils.local import get_request +from apps.utils.redis import DynamicContainer, RedisDict, RedisList from . import translation from .concurrent import inject_request @@ -54,6 +56,7 @@ def batch_request( sort=None, split_params=False, interval=0, + data_backend: str = None, ): """ 异步并发请求接口 @@ -73,14 +76,16 @@ def batch_request( return sync_batch_request(func, params, get_data, limit) start = 0 - data = [] + data: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container if not split_params: request_params = dict(page={"start": 0, "limit": limit}, **params) if sort: request_params["page"]["sort"] = sort query_res = func(request_params) final_request_params = [{"count": get_count(query_res), "params": params}] - data = get_data(query_res) or [] + data.extend(get_data(query_res) or []) # 如果count小于等于limit,直接返回 if final_request_params[0]["count"] <= limit: return data @@ -142,7 +147,7 @@ def sync_batch_request(func, params, get_data=lambda x: x["info"], limit=500): return data -def request_multi_thread(func, params_list, get_data=lambda x: []): +def request_multi_thread(func, params_list, get_data=lambda x: [], data_backend: str = None): """ 并发请求接口,每次按不同参数请求最后叠加请求结果 :param func: 请求方法 @@ -161,11 +166,18 @@ def request_multi_thread(func, params_list, get_data=lambda x: []): if "params" in params: params["params"]["_request"] = _request - result = [] + result: Union[RedisList, list] = DynamicContainer( + return_type=constants.DCReturnType.LIST.value, data_backend=data_backend + ).container with ThreadPoolExecutor(max_workers=settings.CONCURRENT_NUMBER) as ex: tasks = [ ex.submit(translation.RespectsLanguage(language=get_language())(func), **params) for params in params_list ] for future in as_completed(tasks): - result.extend(get_data(future.result())) + _result = future.result() + if isinstance(_result, RedisDict): + for k, v in _result.items(): + result.extend(get_data({k: v})) + else: + result.extend(get_data(_result)) return result diff --git a/apps/utils/concurrent.py b/apps/utils/concurrent.py index eafac0d259..7cf85abf70 100644 --- a/apps/utils/concurrent.py +++ b/apps/utils/concurrent.py @@ -15,14 +15,16 @@ from concurrent.futures import as_completed from concurrent.futures.thread import ThreadPoolExecutor from multiprocessing import cpu_count, get_context -from typing import Callable, Coroutine, Dict, List +from typing import Callable, Coroutine, Dict, List, Union from asgiref.sync import async_to_sync from django.conf import settings from django.utils.translation import get_language from apps.exceptions import AppBaseException +from apps.node_man.constants import DCReturnType from apps.utils import local +from apps.utils.redis import DynamicContainer, RedisList from . import translation @@ -48,6 +50,7 @@ def batch_call( get_data=lambda x: x, extend_result: bool = False, interval: float = 0, + data_backend: str = None, **kwargs ) -> List: """ @@ -61,7 +64,9 @@ def batch_call( :return: 请求结果累计 """ - result = [] + result: Union[RedisList, list] = DynamicContainer( + return_type=DCReturnType.LIST.value, data_backend=data_backend + ).container # 不存在参数列表,直接返回 if not params_list: diff --git a/apps/utils/redis.py b/apps/utils/redis.py new file mode 100644 index 0000000000..64cc8b5aea --- /dev/null +++ b/apps/utils/redis.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import json +import uuid +from collections.abc import Iterator +from typing import Any, Generator, Iterable, Optional, Tuple + +from _collections_abc import dict_keys +from django.conf import settings +from django_redis import get_redis_connection + +from apps.node_man.constants import DataBackend, DCReturnType + +# 过期时间为30分钟 +REDIS_CACHE_DATA_TIMEOUT = 60 * 30 +# 每次取值的长度 +REDIS_CACHE_DATA_LENGTH = 1000 + + +class RedisHashScanner: + def __init__(self, key: str, match: Optional[str] = None, count: int = REDIS_CACHE_DATA_LENGTH): + self.redis_client = get_redis_connection() + self.key = key + self.match = match + self.count = count + + def __iter__(self) -> Generator[Tuple[str, str], None, None]: + cursor = "0" + while cursor != 0: + cursor, data = self.redis_client.hscan(self.key, cursor=cursor, match=self.match, count=self.count) + for field, value in data.items(): + yield field, json.loads(value) + + +class RedisHashValuesScanner(RedisHashScanner): + def __iter__(self) -> Generator[Tuple[str, str], None, None]: + cursor = "0" + while cursor != 0: + cursor, data = self.redis_client.hscan(self.key, cursor=cursor, match=self.match, count=self.count) + for _, value in data.items(): + yield json.loads(value) + + +class RedisListIterator: + def __init__(self, key: str, batch_size: int = REDIS_CACHE_DATA_LENGTH): + self.redis_client = get_redis_connection() + self.key = key + self.batch_size = batch_size + + def __iter__(self) -> Generator[Tuple[str, str], None, None]: + start = 0 + while True: + end = start + self.batch_size - 1 + elements = self.redis_client.lrange(self.key, start, end) + if not elements: + break + yield from (json.loads(element) for element in elements) + start += self.batch_size + + +class RedisDataBase: + def __init__(self, uuid_key: str = None, cache_uuid_key: str = None, cache_time=None): + self.cache_uuid_key = cache_uuid_key + self.uuid_key = uuid_key or f"{uuid.uuid4().hex}" + self.client = get_redis_connection() + self.cache_time = cache_time or REDIS_CACHE_DATA_TIMEOUT + self._update_redis_expiry() + + def _update_redis_expiry(self, cache_time=None): + self.client.expire(self.cache_uuid_key or self.uuid_key, cache_time or self.cache_time) + + def __del__(self): + self.client.delete(self.uuid_key) + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.__del__() + + +class RedisDict(RedisDataBase, dict): + def __setitem__(self, key, value): + self.client.hset(self.cache_uuid_key or self.uuid_key, key, json.dumps(value)) + self._update_redis_expiry() + + def update(self, *args, **kwargs): + temp_dict = {} + for k, v in dict(*args, **kwargs).items(): + temp_dict[k] = json.dumps(v) + if temp_dict: + self.client.hset(self.cache_uuid_key or self.uuid_key, mapping=temp_dict) + self._update_redis_expiry() + + def __getitem__(self, key: Any) -> Any: + data = json.loads(self.client.hget(self.cache_uuid_key or self.uuid_key, key) or "null") + if data: + return data + raise KeyError() + + def __len__(self) -> int: + return self.client.hlen(self.cache_uuid_key or self.uuid_key) + + def keys(self) -> dict_keys: + return self.client.hkeys(self.cache_uuid_key or self.uuid_key) + + def get(self, key: Any, default=None): + data = json.loads(self.client.hget(self.cache_uuid_key or self.uuid_key, key) or "null") + return data or default + + def values(self): + return RedisHashValuesScanner(self.cache_uuid_key or self.uuid_key) + + def items(self): + return RedisHashScanner(self.cache_uuid_key or self.uuid_key) + + def __str__(self): + return self.uuid_key + + +class RedisList(RedisDataBase, list): + def __iter__(self) -> Iterator: + self.index = 0 + return self + + def __next__(self): + if self.index < self.client.llen(self.cache_uuid_key or self.uuid_key): + item = self.client.lindex(self.cache_uuid_key or self.uuid_key, self.index) + self.index += 1 + return json.loads(item) + else: + raise StopIteration + + def extend(self, iterable: Iterable[Any]) -> None: + serialized_items = [json.dumps(item) for item in iterable] + if serialized_items: + self.client.rpush(self.cache_uuid_key or self.uuid_key, *serialized_items) + self._update_redis_expiry() + + def append(self, obj: Any) -> None: + self.client.rpush(self.cache_uuid_key or self.uuid_key, json.dumps(obj)) + self._update_redis_expiry() + + def __len__(self) -> int: + return self.client.llen(self.cache_uuid_key or self.uuid_key) + + +class DynamicContainer: + def __init__( + self, + return_type: str = DCReturnType.DICT.value, + data_backend: str = DataBackend.REDIS.value, + cache_time: int = None, + ): + + if settings.DATA_BACKEND == DataBackend.REDIS.value or data_backend == DataBackend.REDIS.value: + self._container = ( + RedisDict(cache_time=cache_time) + if return_type == DCReturnType.DICT.value + else RedisList(cache_time=cache_time) + ) + else: + self._container = {} if return_type == DCReturnType.DICT.value else [] + + @property + def container(self): + return self._container diff --git a/config/default.py b/config/default.py index 9c5e88962c..42f97015ec 100644 --- a/config/default.py +++ b/config/default.py @@ -377,6 +377,8 @@ } +# DATA BACKEND +DATA_BACKEND = os.getenv("BKAPP_DATA_BACKEND", "MEM") ESB_SDK_NAME = "blueking.component" BKCRYPTO = { diff --git a/pipeline/apps.py b/pipeline/apps.py index 959f3fa664..6daf17dfab 100644 --- a/pipeline/apps.py +++ b/pipeline/apps.py @@ -10,7 +10,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ - +import sys import logging import traceback @@ -98,3 +98,5 @@ def ready(self): == "pipeline.engine.core.data.redis_backend.RedisDataBackend" ): logger.error("can not find REDIS in settings!") + + sys.setrecursionlimit(10000) diff --git a/pipeline/engine/core/handlers/service_activity.py b/pipeline/engine/core/handlers/service_activity.py index c2cd55bd9a..f14e0a5ab7 100644 --- a/pipeline/engine/core/handlers/service_activity.py +++ b/pipeline/engine/core/handlers/service_activity.py @@ -17,8 +17,9 @@ from pipeline.conf import default_settings from pipeline.core.data.hydration import hydrate_node_data from pipeline.core.flow.activity import ServiceActivity +from pipeline.core.flow.gateway import ConvergeGateway from pipeline.django_signal_valve import valve -from pipeline.engine import signals +from pipeline.engine import signals, states from pipeline.engine.models import Data, ScheduleService, Status from .base import FlowElementHandler @@ -34,6 +35,24 @@ def element_cls(): return ServiceActivity def handle(self, process, element, status): + + is_multi_paralle_gateway: bool = isinstance(element, ServiceActivity) and element.data.get_one_of_inputs( + "is_multi_paralle_gateway" + ) + + if is_multi_paralle_gateway and element.incoming: + incoming_service = element.incoming.flows[0].source.service + act_id = incoming_service.id + if all( + [incoming_service.need_schedule(), not Status.objects.filter(id=act_id, state=states.FINISHED).exists()] + ): + next_node = element.next() + while is_multi_paralle_gateway: + # 跳过所有未执行的ServiceActivity寻找最近的ConvergeGateway + if isinstance(next_node, ConvergeGateway): + return self.HandleResult(next_node=next_node, should_return=False, should_sleep=False) + next_node = next_node.next() + success = False exception_occurred = False monitoring = False @@ -70,7 +89,8 @@ def handle(self, process, element, status): monitoring = True element.setup_runtime_attrs( - id=element.id, root_pipeline_id=root_pipeline.id, + id=element.id, + root_pipeline_id=root_pipeline.id, ) # execute service @@ -111,6 +131,13 @@ def handle(self, process, element, status): subprocess_id_stack=process.subprocess_stack, ) + next_node = element.next() + while is_multi_paralle_gateway: + # 跳过所有未执行的ServiceActivity寻找最近的ConvergeGateway + if isinstance(next_node, ConvergeGateway): + return self.HandleResult(next_node=next_node, should_return=False, should_sleep=False) + next_node = next_node.next() + return self.HandleResult(next_node=None, should_return=False, should_sleep=True) else: is_error_ignored = element.error_ignorable and not element.get_result_bit() diff --git a/pipeline/engine/core/schedule.py b/pipeline/engine/core/schedule.py index a4eb829758..8f71866061 100644 --- a/pipeline/engine/core/schedule.py +++ b/pipeline/engine/core/schedule.py @@ -19,8 +19,18 @@ from pipeline.django_signal_valve import valve from pipeline.engine import exceptions, signals, states -from pipeline.engine.core.data import delete_parent_data, get_schedule_parent_data, set_schedule_data -from pipeline.engine.models import Data, MultiCallbackData, PipelineProcess, ScheduleService, Status +from pipeline.engine.core.data import ( + delete_parent_data, + get_schedule_parent_data, + set_schedule_data, +) +from pipeline.engine.models import ( + Data, + MultiCallbackData, + PipelineProcess, + ScheduleService, + Status, +) logger = logging.getLogger("celery") @@ -31,7 +41,7 @@ def schedule_exception_handler(process_id, schedule_id): yield except Exception as e: activity_id = schedule_id[: ScheduleService.SCHEDULE_ID_SPLIT_DIVISION] - version = schedule_id[ScheduleService.SCHEDULE_ID_SPLIT_DIVISION:] + version = schedule_id[ScheduleService.SCHEDULE_ID_SPLIT_DIVISION :] if Status.objects.filter(id=activity_id, version=version).exists(): logger.error(traceback.format_exc()) process = PipelineProcess.objects.get(id=process_id) @@ -158,13 +168,16 @@ def schedule(process_id, schedule_id, data_id=None): Data.objects.write_node_data(service_act, ex_data=ex_data) + is_multi_paralle_gateway = service_act.data.get_one_of_inputs("is_multi_paralle_gateway") + with transaction.atomic(): process = PipelineProcess.objects.select_for_update().get(id=sched_service.process_id) if not process.is_alive: logger.info("pipeline %s has been revoked, status adjust failed." % process.root_pipeline_id) return - process.adjust_status() + if not is_multi_paralle_gateway: + process.adjust_status() # send activity error signal try: @@ -172,18 +185,36 @@ def schedule(process_id, schedule_id, data_id=None): except Exception: logger.error("schedule_fail handler fail: %s" % traceback.format_exc()) - signals.service_schedule_fail.send( - sender=ScheduleService, activity_shell=service_act, schedule_service=sched_service, ex_data=ex_data - ) + if is_multi_paralle_gateway: + + signals.service_schedule_success.send( + sender=ScheduleService, activity_shell=service_act, schedule_service=sched_service + ) + + valve.send( + signals, + "wake_from_schedule", + sender=ScheduleService, + process_id=sched_service.process_id, + activity_id=sched_service.activity_id, + ) + else: + signals.service_schedule_fail.send( + sender=ScheduleService, + activity_shell=service_act, + schedule_service=sched_service, + ex_data=ex_data, + ) + + valve.send( + signals, + "activity_failed", + sender=process.root_pipeline, + pipeline_id=process.root_pipeline_id, + pipeline_activity_id=service_act.id, + subprocess_id_stack=process.subprocess_stack, + ) - valve.send( - signals, - "activity_failed", - sender=process.root_pipeline, - pipeline_id=process.root_pipeline_id, - pipeline_activity_id=service_act.id, - subprocess_id_stack=process.subprocess_stack, - ) return # schedule execute finished or one time callback finished diff --git a/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml b/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml index cf4a9857c5..b04209de9d 100644 --- a/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml +++ b/support-files/kubernetes/helm/bk-nodeman/templates/configmaps/env-configmap.yaml @@ -138,3 +138,4 @@ data: TXY_SECRETID: "{{ .Values.config.TXYSecretId }}" TXY_SECRETKEY: "{{ .Values.config.TXYSecretKey }}" BKAPP_UNASSIGNED_CLOUD_ID: "{{ .Values.config.bkAppUnassignedCloudId}}" + BKAPP_DATA_BACKEND: '{{ .Values.config.bkAppDataBackend }}' diff --git a/support-files/templates/nodeman#bin#environ.sh b/support-files/templates/nodeman#bin#environ.sh index 849dd3199a..84d0555a18 100755 --- a/support-files/templates/nodeman#bin#environ.sh +++ b/support-files/templates/nodeman#bin#environ.sh @@ -102,4 +102,5 @@ export GSE_ENVIRON_DIR="__BK_GSE_ENVIRON_DIR__" export GSE_ENVIRON_WIN_DIR="__BK_GSE_ENVIRON_WIN_DIR__" export BKAPP_ENABLE_DHCP="__BK_NODEMAN_ENABLE_DHCP__" -export BKAPP_BK_GSE_APIGATEWAY="__BK_API_GATEWAY_GSE_URL__" \ No newline at end of file +export BKAPP_BK_GSE_APIGATEWAY="__BK_API_GATEWAY_GSE_URL__" +export BKAPP_DATA_BACKEND="__BKAPP_NODEMAN_DATA_BACKEND__" \ No newline at end of file