Skip to content

Commit

Permalink
feat: 超大订阅方案 (closed #2429)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Nov 11, 2024
1 parent a816e37 commit 24a4df8
Showing 1 changed file with 96 additions and 97 deletions.
193 changes: 96 additions & 97 deletions apps/backend/subscription/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,93 +951,91 @@ def get_instances_by_scope(

need_register = scope.get("need_register", False)

with DynamicContainer(
return_type=constants.DCReturnType.LIST.value, data_backend=data_backend
).container as instances:
# 按照拓扑查询
if scope["node_type"] == models.Subscription.NodeType.TOPO:
if scope["object_type"] == models.Subscription.ObjectType.HOST:
instances.extend([{"host": inst} for inst in get_host_relation(bk_biz_id, nodes, data_backend)])
else:
# 补充服务实例中的信息
instances.extend(
[
{"service": inst}
for inst in get_service_instance_by_inst(
bk_biz_id, nodes, module_to_topo, data_backend=data_backend
)
]
)
instances = DynamicContainer(return_type=constants.DCReturnType.LIST.value, data_backend=data_backend).container
# 按照拓扑查询
if scope["node_type"] == models.Subscription.NodeType.TOPO:
if scope["object_type"] == models.Subscription.ObjectType.HOST:
instances.extend([{"host": inst} for inst in get_host_relation(bk_biz_id, nodes, data_backend)])
else:
# 补充服务实例中的信息
instances.extend(
[
{"service": inst}
for inst in get_service_instance_by_inst(
bk_biz_id, nodes, module_to_topo, data_backend=data_backend
)
]
)

# 按照实例查询
elif scope["node_type"] == models.Subscription.NodeType.INSTANCE:
if scope["object_type"] == models.Subscription.ObjectType.HOST:
host_detail: Union[RedisList, list] = get_host_detail(
nodes, bk_biz_id=bk_biz_id, source="get_instances_by_scope", data_backend=data_backend
)
instances.extend([{"host": inst} for inst in host_detail])
else:
service_instance_ids = [int(node["id"]) for node in nodes]
instances.extend(
[
{"service": inst}
for inst in get_service_instances(
bk_biz_id=bk_biz_id,
filter_id_list=service_instance_ids,
filter_field_name=FilterFieldName.SERVICE_INSTANCE_IDS,
ignore_exception=False,
data_backend=data_backend,
)
]
)
# 按照实例查询
elif scope["node_type"] == models.Subscription.NodeType.INSTANCE:
if scope["object_type"] == models.Subscription.ObjectType.HOST:
host_detail: Union[RedisList, list] = get_host_detail(
nodes, bk_biz_id=bk_biz_id, source="get_instances_by_scope", data_backend=data_backend
)
instances.extend([{"host": inst} for inst in host_detail])
else:
service_instance_ids = [int(node["id"]) for node in nodes]
instances.extend(
[
{"service": inst}
for inst in get_service_instances(
bk_biz_id=bk_biz_id,
filter_id_list=service_instance_ids,
filter_field_name=FilterFieldName.SERVICE_INSTANCE_IDS,
ignore_exception=False,
data_backend=data_backend,
)
]
)

# 按照模板查询
elif scope["node_type"] in [
models.Subscription.NodeType.SERVICE_TEMPLATE,
models.Subscription.NodeType.SET_TEMPLATE,
]:
# 校验是否都选择了同一种模板
bk_obj_id_set = check_instances_object_type(nodes)
if scope["object_type"] == models.Subscription.ObjectType.HOST:
# 补充实例所属模块ID
host_biz_relations = Union[RedisList, list] = DynamicContainer(
return_type=constants.DCReturnType.LIST.value, data_backend=data_backend
).container
instances.extend(
[
{"host": inst}
for inst in get_host_detail_by_template(
list(bk_obj_id_set)[0], nodes, bk_biz_id=bk_biz_id, data_backend=data_backend
)
]
)
bk_host_id_chunks = chunk_lists([instance["host"]["bk_host_id"] for instance in instances], 500)
with ThreadPoolExecutor(max_workers=settings.CONCURRENT_NUMBER) as ex:
tasks = [
ex.submit(client_v2.cc.find_host_biz_relations, dict(bk_host_id=chunk, bk_biz_id=bk_biz_id))
for chunk in bk_host_id_chunks
]
for future in as_completed(tasks):
host_biz_relations.extend(future.result())

# 转化模板为节点
nodes = set_template_scope_nodes(scope)
instances: Union[RedisList, list] = add_host_module_info(
host_biz_relations, instances, data_backend=data_backend
)
# 按照模板查询
elif scope["node_type"] in [
models.Subscription.NodeType.SERVICE_TEMPLATE,
models.Subscription.NodeType.SET_TEMPLATE,
]:
# 校验是否都选择了同一种模板
bk_obj_id_set = check_instances_object_type(nodes)
if scope["object_type"] == models.Subscription.ObjectType.HOST:
# 补充实例所属模块ID
host_biz_relations = Union[RedisList, list] = DynamicContainer(
return_type=constants.DCReturnType.LIST.value, data_backend=data_backend
).container
instances.extend(
[
{"host": inst}
for inst in get_host_detail_by_template(
list(bk_obj_id_set)[0], nodes, bk_biz_id=bk_biz_id, data_backend=data_backend
)
]
)
bk_host_id_chunks = chunk_lists([instance["host"]["bk_host_id"] for instance in instances], 500)
with ThreadPoolExecutor(max_workers=settings.CONCURRENT_NUMBER) as ex:
tasks = [
ex.submit(client_v2.cc.find_host_biz_relations, dict(bk_host_id=chunk, bk_biz_id=bk_biz_id))
for chunk in bk_host_id_chunks
]
for future in as_completed(tasks):
host_biz_relations.extend(future.result())

else:
# 补充服务实例中的信息
# 转化模板为节点,**注意不可在get_service_instance_by_inst之后才转换**
nodes = set_template_scope_nodes(scope)
instances.extend(
[
{"service": inst}
for inst in get_service_instance_by_inst(
bk_biz_id, nodes, module_to_topo, data_backend=data_backend
)
]
)
# 转化模板为节点
nodes = set_template_scope_nodes(scope)
instances: Union[RedisList, list] = add_host_module_info(
host_biz_relations, instances, data_backend=data_backend
)

else:
# 补充服务实例中的信息
# 转化模板为节点,**注意不可在get_service_instance_by_inst之后才转换**
nodes = set_template_scope_nodes(scope)
instances.extend(
[
{"service": inst}
for inst in get_service_instance_by_inst(
bk_biz_id, nodes, module_to_topo, data_backend=data_backend
)
]
)

if not need_register:
# 补充必要的主机或实例相关信息
Expand Down Expand Up @@ -1110,20 +1108,21 @@ def add_host_info_to_instances(bk_biz_id: int, scope: Dict, instances: RedisList
# 非服务实例,不需要补充实例主机信息
return redis_instances if is_redis_data_backend else instances

with DynamicContainer(data_backend=data_backend).container as host_dict:
hosts_detail: Union[RedisList, list] = get_host_detail(
[instance["service"] for instance in instances],
bk_biz_id=bk_biz_id,
source="add_host_info_to_instances",
data_backend=data_backend,
)
for host_info in hosts_detail:
host_dict[host_info["bk_host_id"]] = host_info
host_dict = DynamicContainer(data_backend=data_backend).container

hosts_detail: Union[RedisList, list] = get_host_detail(
[instance["service"] for instance in instances],
bk_biz_id=bk_biz_id,
source="add_host_info_to_instances",
data_backend=data_backend,
)
for host_info in hosts_detail:
host_dict[host_info["bk_host_id"]] = host_info

for instance in instances:
instance["host"] = host_dict[instance["service"]["bk_host_id"]]
if is_redis_data_backend:
redis_instances.append(instance)
for instance in instances:
instance["host"] = host_dict[instance["service"]["bk_host_id"]]
if is_redis_data_backend:
redis_instances.append(instance)

return redis_instances if is_redis_data_backend else instances

Expand Down

0 comments on commit 24a4df8

Please sign in to comment.