Skip to content

Commit

Permalink
feat: 超大订阅方案 (closed #2429)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Nov 7, 2024
1 parent f14f01b commit a816e37
Show file tree
Hide file tree
Showing 25 changed files with 847 additions and 242 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repos:
name: black
language: python
- repo: https://github.com/pycqa/isort
rev: 5.5.4
rev: 5.12.0
hooks:
- id: isort
args: [ "--profile", "black" ]
Expand Down
1 change: 1 addition & 0 deletions apps/backend/agent/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
class AgentServiceActivity(ServiceActivity):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.is_multi_paralle_gateway: bool = False
self.component.inputs.meta = Var(type=Var.SPLICE, value="${meta}")
self.component.inputs.description = Var(type=Var.SPLICE, value="${description}")
self.component.inputs.blueking_language = Var(type=Var.SPLICE, value="${blueking_language}")
Expand Down
25 changes: 23 additions & 2 deletions apps/backend/components/collections/agent_new/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from apps.prometheus.helper import SetupObserve

from .. import job
from ..base import BaseService, CommonData
from ..base import BaseService, CommonData, RedisCommonData

logger = logging.getLogger("celery")

Expand All @@ -49,6 +49,25 @@ class AgentCommonData(CommonData):
injected_ap_id: int


class RedisAgentCommonData(RedisCommonData):

# 默认接入点
def default_ap(self) -> models.AccessPoint:
return self._get_attr_from_redis("default_ap")

# 主机ID - 接入点 映射关系
def host_id__ap_map(self) -> Dict[int, models.AccessPoint]:
return self._get_attr_from_redis("host_id__ap_map")

# AgentStep 适配器
def agent_step_adapter(self) -> AgentStepAdapter:
return self._get_attr_from_redis("agent_step_adapter")

# 注入AP_ID
def injected_ap_id(self) -> int:
return self._get_attr_from_redis("injected_ap_id")


class AgentBaseService(BaseService, metaclass=abc.ABCMeta):
"""
AGENT安装基类
Expand Down Expand Up @@ -91,7 +110,9 @@ def get_common_data(cls, data):
common_data.subscription_step, gse_version=data.get_one_of_inputs("meta", {}).get("GSE_VERSION")
)

return AgentCommonData(
agent_common_data_cls = AgentCommonData if isinstance(common_data, CommonData) else RedisAgentCommonData

return agent_common_data_cls(
bk_host_ids=common_data.bk_host_ids,
host_id_obj_map=common_data.host_id_obj_map,
ap_id_obj_map=common_data.ap_id_obj_map,
Expand Down
78 changes: 77 additions & 1 deletion apps/backend/components/collections/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import ast
import logging
import os
import pickle
import traceback
import typing
import uuid
from collections import defaultdict
from dataclasses import dataclass
from typing import (
Expand All @@ -32,6 +35,7 @@
from django.db.models.functions import Concat
from django.utils import timezone
from django.utils.translation import ugettext as _
from django_redis import get_redis_connection

from apps.adapters.api.gse import GseApiBaseHelper, get_gse_api_helper
from apps.backend.api.constants import POLLING_TIMEOUT
Expand All @@ -45,6 +49,7 @@
from apps.prometheus.helper import SetupObserve
from apps.utils import cache, time_handler, translation
from apps.utils.exc import ExceptionHandler
from apps.utils.redis import REDIS_CACHE_DATA_TIMEOUT
from pipeline.core.flow import Service

logger = logging.getLogger("celery")
Expand Down Expand Up @@ -263,12 +268,80 @@ class CommonData:
subscription_instance_ids: Set[int]


class RedisCommonData:
def __init__(self, *args, **kwargs):
self.uuid_key = uuid.uuid4().hex
self.client = get_redis_connection()

for k, v in dict(*args, **kwargs).items():
self.client.hset(self.uuid_key, k, str(pickle.dumps(v)))

self.client.expire(self.uuid_key, REDIS_CACHE_DATA_TIMEOUT)

def _get_attr_from_redis(self, key):
return pickle.loads(ast.literal_eval(self.client.hget(self.uuid_key, key)))

def __del__(self):
self.client.delete(self.uuid_key)

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.__del__()

@property
def bk_host_ids(self) -> Set[int]:
return self._get_attr_from_redis("bk_host_ids")

@property
def host_id_obj_map(self) -> Dict[int, models.Host]:
return self._get_attr_from_redis("host_id_obj_map")

@property
def sub_inst_id__host_id_map(self) -> Dict[int, int]:
return self._get_attr_from_redis("sub_inst_id__host_id_map")

@property
def host_id__sub_inst_id_map(self) -> Dict[int, int]:
return self._get_attr_from_redis("host_id__sub_inst_id_map")

@property
def ap_id_obj_map(self) -> Dict[int, models.AccessPoint]:
return self._get_attr_from_redis("ap_id_obj_map")

@property
def sub_inst_id__sub_inst_obj_map(self) -> Dict[int, models.SubscriptionInstanceRecord]:
return self._get_attr_from_redis("sub_inst_id__sub_inst_obj_map")

@property
def gse_api_helper(self) -> GseApiBaseHelper:
return self._get_attr_from_redis("gse_api_helper")

@property
def subscription(self) -> models.Subscription:
return self._get_attr_from_redis("subscription")

@property
def subscription_step(self) -> models.SubscriptionStep:
return self._get_attr_from_redis("subscription_step")

@property
def subscription_instances(self) -> List[models.SubscriptionInstanceRecord]:
return self._get_attr_from_redis("subscription_instances")

@property
def subscription_instance_ids(self) -> Set[int]:
return self._get_attr_from_redis("subscription_instance_ids")


class BaseService(Service, LogMixin, DBHelperMixin, PollingTimeoutMixin):

# 失败订阅实例ID - 失败原因 映射关系
failed_subscription_instance_id_reason_map: Optional[Dict[int, Any]] = None
# 日志制作类实例
log_maker: Optional[LogMaker] = None
# is_multi_paralle_gateway: bool = False

def __init__(self, *args, **kwargs):
self.failed_subscription_instance_id_reason_map: Dict = {}
Expand Down Expand Up @@ -447,7 +520,10 @@ def get_common_data(cls, data):
break

ap_id_obj_map = models.AccessPoint.ap_id_obj_map()
return CommonData(

common_data_cls = RedisCommonData if data.get_one_of_inputs("is_multi_paralle_gateway") else CommonData

return common_data_cls(
bk_host_ids=bk_host_ids,
host_id_obj_map=host_id_obj_map,
sub_inst_id__host_id_map=sub_inst_id__host_id_map,
Expand Down
4 changes: 3 additions & 1 deletion apps/backend/periodic_tasks/cache_scope_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from apps.backend.subscription import constants, tools
from apps.node_man import models
from apps.node_man.constants import DataBackend
from apps.utils.md5 import count_md5
from apps.utils.periodic_task import calculate_countdown

Expand All @@ -31,8 +32,9 @@ def get_instances_by_scope_task(subscription_id):
f" scope_md5: {scope_md5}, scope: {subscription.scope}"
)
# 查询后会进行缓存,详见 get_instances_by_scope 的装饰器 func_cache_decorator
data_backend = DataBackend.REDIS.value if subscription.is_multi_paralle_gateway else DataBackend.MEM.value
tools.get_instances_by_scope_with_checker(
subscription.scope, subscription.steps, source="get_instances_by_scope_task"
subscription.scope, subscription.steps, source="get_instances_by_scope_task", data_backend=data_backend
)
logger.info(f"[cache_subscription_scope_instances] (subscription: {subscription_id}) end.")

Expand Down
38 changes: 30 additions & 8 deletions apps/backend/subscription/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
specific language governing permissions and limitations under the License.
"""
import logging
from typing import Union

from django.core.cache import cache

Expand All @@ -18,6 +19,7 @@
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve, get_call_resource_labels_func
from apps.utils.batch_request import batch_request
from apps.utils.redis import DynamicContainer, RedisList

logger = logging.getLogger("app")

Expand All @@ -38,7 +40,7 @@ def get_host_object_attribute(bk_biz_id):


@SetupObserve(counter=metrics.app_common_method_requests_total, get_labels_func=get_call_resource_labels_func)
def list_biz_hosts(bk_biz_id, condition, func, split_params=False):
def list_biz_hosts(bk_biz_id, condition, func, split_params=False, data_backend: str = None):
biz_custom_property = []
kwargs = {
"fields": constants.CC_HOST_FIELDS,
Expand All @@ -50,14 +52,16 @@ def list_biz_hosts(bk_biz_id, condition, func, split_params=False):
kwargs["fields"] += list(set(biz_custom_property))
kwargs["fields"] = list(set(kwargs["fields"]))
kwargs.update(condition)

hosts = batch_request(getattr(client_v2.cc, func), kwargs, split_params=split_params)
hosts = batch_request(getattr(client_v2.cc, func), kwargs, split_params=split_params, data_backend=data_backend)
# 排除掉CMDB中内网IP为空的主机
cleaned_hosts = [host for host in hosts if host.get("bk_host_innerip") or host.get("bk_host_innerip_v6")]
cleaned_hosts: Union[RedisList, list] = DynamicContainer(
return_type=constants.DCReturnType.LIST.value, data_backend=data_backend
).container
cleaned_hosts.extend([host for host in hosts if host.get("bk_host_innerip") or host.get("bk_host_innerip_v6")])
return cleaned_hosts


def get_host_by_inst(bk_biz_id, inst_list):
def get_host_by_inst(bk_biz_id, inst_list, data_backend: str = None):
"""
根据拓扑节点查询主机
:param inst_list: 实例列表
Expand All @@ -67,7 +71,9 @@ def get_host_by_inst(bk_biz_id, inst_list):
if not bk_biz_id:
raise BizNotExistError()

hosts = []
hosts: Union[RedisList, list] = DynamicContainer(
return_type=constants.DCReturnType.LIST.value, data_backend=data_backend
).container
bk_module_ids = []
bk_set_ids = []
bk_biz_ids = []
Expand All @@ -88,20 +94,35 @@ def get_host_by_inst(bk_biz_id, inst_list):
# 自定义层级
topo_cond = {"bk_obj_id": inst["bk_obj_id"], "bk_inst_id": inst["bk_inst_id"]}
hosts.extend(
list_biz_hosts(bk_biz_id, topo_cond, "find_host_by_topo", source="get_host_by_inst:find_host_by_topo")
list_biz_hosts(
bk_biz_id,
topo_cond,
"find_host_by_topo",
source="get_host_by_inst:find_host_by_topo",
data_backend=data_backend,
)
)

if bk_biz_ids:
# 业务查询
for bk_biz_id in bk_biz_ids:
hosts.extend(list_biz_hosts(bk_biz_id, {}, "list_biz_hosts", source="get_host_by_inst:list_biz_hosts:biz"))
hosts.extend(
list_biz_hosts(
bk_biz_id,
{},
"list_biz_hosts",
source="get_host_by_inst:list_biz_hosts:biz",
data_backend=data_backend,
)
)
if bk_set_ids:
# 集群查询
hosts.extend(
list_biz_hosts(
bk_biz_id,
{"set_cond": [{"field": "bk_set_id", "operator": "$in", "value": bk_set_ids}]},
"list_biz_hosts",
data_backend=data_backend,
)
)
if bk_module_ids:
Expand All @@ -113,6 +134,7 @@ def get_host_by_inst(bk_biz_id, inst_list):
"list_biz_hosts",
split_params=True,
source="get_host_by_inst:list_biz_hosts:module",
data_backend=data_backend,
)
)

Expand Down
3 changes: 3 additions & 0 deletions apps/backend/subscription/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@
# 单个任务主机数量
TASK_HOST_LIMIT = 500

# 单个并行网关的子进程数量 并行网关主机数量 1000*500
PARALLE_GATEWAY_PROCESS_LIMIT = 1000

# 订阅范围实例缓存时间,比自动下发周期多1小时
SUBSCRIPTION_SCOPE_CACHE_TIME = SUBSCRIPTION_UPDATE_INTERVAL + constants.TimeUnit.HOUR
36 changes: 30 additions & 6 deletions apps/backend/subscription/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import random
from collections import Counter, defaultdict
from copy import deepcopy
from typing import Any, Dict, List, Optional, Set
from typing import Any, Dict, List, Optional, Set, Union

from django.conf import settings
from django.core.cache import cache
Expand All @@ -29,6 +29,7 @@
from apps.node_man import constants, models
from apps.utils import concurrent
from apps.utils.basic import filter_values
from apps.utils.redis import RedisDict
from pipeline.engine.models import PipelineProcess
from pipeline.service import task_service

Expand Down Expand Up @@ -132,9 +133,18 @@ def task_result(
if not need_out_of_scope_snapshots:
# 如果不需要已不在订阅范围内的执行快照,查询订阅范围过滤掉移除的实例 ID
subscription = models.Subscription.objects.get(id=self.subscription_id)
data_backend = (
constants.DataBackend.REDIS.value
if subscription.is_multi_paralle_gateway
else constants.DataBackend.MEM.value
)
scope_instance_id_list: Set[str] = set(
tools.get_instances_by_scope_with_checker(
subscription.scope, subscription.steps, get_cache=True, source="task_result"
subscription.scope,
subscription.steps,
get_cache=True,
source="task_result",
data_backend=data_backend,
).keys()
)
base_kwargs["instance_id__in"] = scope_instance_id_list
Expand Down Expand Up @@ -510,8 +520,13 @@ def statistic(subscription_id_list: List[int]) -> List[Dict]:
sub_statistic_list: List[Dict] = []
for subscription in subscriptions:
sub_statistic = {"subscription_id": subscription.id, "status": []}
current_instances = tools.get_instances_by_scope_with_checker(
subscription.scope, subscription.steps, get_cache=True, source="statistic"
data_backend = (
constants.DataBackend.REDIS.value
if subscription.is_multi_paralle_gateway
else constants.DataBackend.MEM.value
)
current_instances: Union[RedisDict, dict] = tools.get_instances_by_scope_with_checker(
subscription.scope, subscription.steps, get_cache=True, source="statistic", data_backend=data_backend
)

status_statistic = {"SUCCESS": 0, "PENDING": 0, "FAILED": 0, "RUNNING": 0}
Expand Down Expand Up @@ -624,8 +639,17 @@ def instance_status(subscription_id_list: List[int], show_task_detail: bool) ->
result = []
for subscription in subscriptions:
subscription_result = []
current_instances = tools.get_instances_by_scope_with_checker(
subscription.scope, subscription.steps, get_cache=True, source="instance_status"
data_backend = (
constants.DataBackend.REDIS.value
if subscription.is_multi_paralle_gateway
else constants.DataBackend.MEM.value
)
current_instances: Union[RedisDict, dict] = tools.get_instances_by_scope_with_checker(
subscription.scope,
subscription.steps,
get_cache=True,
source="instance_status",
data_backend=data_backend,
)

# 对于每个instance,通过group_id找到其对应的host_status
Expand Down
Loading

0 comments on commit a816e37

Please sign in to comment.