Skip to content

Commit

Permalink
#337 开拓力app重构中
Browse files Browse the repository at this point in the history
  • Loading branch information
DoctorReid committed Jun 20, 2024
1 parent afdfb00 commit 9f62428
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 132 deletions.
6 changes: 3 additions & 3 deletions install.bat
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ echo 准备在当前目录安装Python 3.11
set bit=!PROCESSOR_ARCHITECTURE!
echo 当前系统为 !bit!

set py_url=https://www.python.org/ftp/python/3.11.3/python-3.11.3-embed-win32.zip
set py_file_name=!env_path!\python-3.11.3-embed.zip
set py_url=https://www.python.org/ftp/python/3.11.9/python-3.11.9-embed-win32.zip
set py_file_name=!env_path!\python-3.11.9-embed.zip
if "!bit!"=="AMD64" (
set py_url=https://www.python.org/ftp/python/3.11.3/python-3.11.3-embed-amd64.zip
set py_url=https://www.python.org/ftp/python/3.11.9/python-3.11.9-embed-amd64.zip
)

if not exist "!env_path!" mkdir "!env_path!"
Expand Down
172 changes: 66 additions & 106 deletions src/sr/app/trailblaze_power/trailblaze_power_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@
from sr.app.sim_uni.sim_uni_app import SimUniApp
from sr.app.trailblaze_power.trailblaze_power_config import TrailblazePowerPlanItem
from sr.const import phone_menu_const
from sr.const.map_const import TransportPoint
from sr.context import Context
from sr.image.sceenshot import large_map
from sr.operation import StateOperationNode, StateOperationEdge, OperationOneRoundResult, Operation
from sr.interastral_peace_guide.survival_index_mission import SurvivalIndexMission, \
SurvivalIndexMissionEnum
from sr.operation import StateOperationNode, StateOperationEdge, OperationOneRoundResult
from sr.operation.combine.use_trailblaze_power import UseTrailblazePower
from sr.operation.common.back_to_normal_world_plus import BackToNormalWorldPlus
from sr.operation.common.cancel_mission_trace import CancelMissionTrace
from sr.operation.unit.guide import GuideTabEnum
from sr.operation.unit.guide.choose_guide_tab import ChooseGuideTab
from sr.interastral_peace_guide.survival_index_mission import SurvivalIndexCategoryEnum, SurvivalIndexMission, \
SurvivalIndexMissionEnum
from sr.operation.unit.menu.click_phone_menu_item import ClickPhoneMenuItem
from sr.operation.unit.menu.open_phone_menu import OpenPhoneMenu
from sr.operation.unit.open_map import OpenMap
from sr.sim_uni.sim_uni_const import SimUniWorld, OrnamentExtraction


class TrailblazePower(Application):
Expand All @@ -32,7 +32,10 @@ class TrailblazePower(Application):

STATUS_NORMAL_TASK: ClassVar[str] = '普通副本'
STATUS_SIM_UNI_TASK: ClassVar[str] = '模拟宇宙'
STATUS_OE_TASK: ClassVar[str] = '饰品提取'
STATUS_NO_ENOUGH_POWER: ClassVar[str] = '体力不足'
STATUS_NO_PLAN: ClassVar[str] = '没有开拓力计划'
STATUS_WITH_PLAN: ClassVar[str] = '有开拓力计划'
STATUS_PLAN_FINISHED: ClassVar[str] = '完成计划'

def __init__(self, ctx: Context):
Expand All @@ -46,38 +49,27 @@ def __init__(self, ctx: Context):
check_task = StateOperationNode('检查当前需要挑战的关卡', self._check_task)
edges.append(StateOperationEdge(cancel_trace, check_task))

check_normal_power = StateOperationNode('检查剩余开拓力', self._check_power_for_normal)
edges.append(StateOperationEdge(check_task, check_normal_power, status=TrailblazePower.STATUS_NORMAL_TASK))

challenge_normal = StateOperationNode('挑战普通副本', self._challenge_normal_task)
edges.append(StateOperationEdge(check_normal_power, challenge_normal))
edges.append(StateOperationEdge(challenge_normal, check_task)) # 循环挑战

check_sim_uni_power = StateOperationNode('检查剩余沉浸器', self._check_power_for_sim_uni)
edges.append(StateOperationEdge(check_task, check_sim_uni_power, status=TrailblazePower.STATUS_SIM_UNI_TASK))
check_power = StateOperationNode('检查剩余开拓力', self._check_power)
edges.append(StateOperationEdge(check_task, check_power, status=TrailblazePower.STATUS_WITH_PLAN))

challenge_sim_uni = StateOperationNode('挑战模拟宇宙', self._challenge_sim_uni)
edges.append(StateOperationEdge(check_sim_uni_power, challenge_sim_uni))
edges.append(StateOperationEdge(challenge_sim_uni, check_task))
execute = StateOperationNode('执行开拓力计划', self._execute_plan)
edges.append(StateOperationEdge(check_power, execute))
edges.append(StateOperationEdge(execute, check_task)) # 循环挑战

back = StateOperationNode('完成后返回大世界', op=BackToNormalWorldPlus(ctx))
edges.append(StateOperationEdge(challenge_normal, back, status=TrailblazePower.STATUS_NO_ENOUGH_POWER))
edges.append(StateOperationEdge(challenge_normal, back, status=TrailblazePower.STATUS_PLAN_FINISHED))
edges.append(StateOperationEdge(challenge_sim_uni, back, status=TrailblazePower.STATUS_NO_ENOUGH_POWER))
edges.append(StateOperationEdge(challenge_sim_uni, back, status=TrailblazePower.STATUS_PLAN_FINISHED))
edges.append(StateOperationEdge(execute, back, status=TrailblazePower.STATUS_NO_ENOUGH_POWER))
edges.append(StateOperationEdge(execute, back, status=TrailblazePower.STATUS_PLAN_FINISHED))

super().__init__(ctx, try_times=5,
op_name=gt('开拓力', 'ui'),
edges=edges,
run_record=ctx.tp_run_record)
self.power: Optional[int] = None # 剩余开拓力
self.qty: Optional[int] = None # 沉浸器数量
self.last_challenge_point: Optional[SurvivalIndexMission] = None

def _init_before_execute(self):
super()._init_before_execute()
self.last_challenge_point = None
self.power = None
self.last_mission: Optional[SurvivalIndexMission] = None # 上一个挑战副本
self.power: int = 0 # 剩余开拓力
self.qty: int = 0 # 沉浸器数量

def _check_task(self) -> OperationOneRoundResult:
"""
Expand All @@ -88,83 +80,22 @@ def _check_task(self) -> OperationOneRoundResult:
plan: Optional[TrailblazePowerPlanItem] = self.ctx.tp_config.next_plan_item

if plan is None:
return self.round_success()

point: Optional[SurvivalIndexMission] = SurvivalIndexMissionEnum.get_by_unique_id(plan['mission_id'])
if point.cate == SurvivalIndexCategoryEnum.SI_SIM_UNI.value:
return self.round_success(TrailblazePower.STATUS_SIM_UNI_TASK)
else:
return self.round_success(TrailblazePower.STATUS_NORMAL_TASK)
return self.round_success(status=TrailblazePower.STATUS_NO_PLAN)

def _check_power_for_normal(self) -> OperationOneRoundResult:
"""
普通副本 在大地图上看剩余体力
:return:
"""
if self.power is not None: # 之前已经检测过了
return self.round_success()

op = OpenMap(self.ctx)
op_result = op.execute()
if not op_result.success:
return self.round_retry('打开大地图失败')

screen: MatLike = self.screenshot()
part = cv2_utils.crop_image_only(screen, large_map.LARGE_MAP_POWER_RECT)
ocr_result = self.ctx.ocr.ocr_for_single_line(part, strict_one_line=True)
self.power = str_utils.get_positive_digits(ocr_result, err=None)
if self.power is None:
return self.round_retry('检测剩余开拓力失败', wait=1)
else:
log.info('识别当前开拓力 %d', self.power)
return self.round_success()
return self.round_success(status=TrailblazePower.STATUS_WITH_PLAN)

def _challenge_normal_task(self) -> OperationOneRoundResult:
"""
挑战普通副本
:return:
"""
plan: Optional[TrailblazePowerPlanItem] = self.ctx.tp_config.next_plan_item
point: Optional[SurvivalIndexMission] = SurvivalIndexMissionEnum.get_by_unique_id(plan['mission_id'])
run_times: int = self.power // point.power
if run_times == 0:
return self.round_success(TrailblazePower.STATUS_NO_ENOUGH_POWER)
if run_times + plan['run_times'] > plan['plan_times']:
run_times = plan['plan_times'] - plan['run_times']
if run_times == 0:
return self.round_success(TrailblazePower.STATUS_PLAN_FINISHED)

op = UseTrailblazePower(self.ctx, point, plan['team_num'], run_times,
support=plan['support'] if plan['support'] != 'none' else None,
on_battle_success=self._on_normal_task_success,
need_transport=point != self.last_challenge_point)

op_result = op.execute()
if op_result.success:
self.last_challenge_point = point
return self.round_by_op(op_result)

def _on_normal_task_success(self, finished_times: int, use_power: int):
def _check_power(self) -> OperationOneRoundResult:
"""
普通副本获取一次奖励时候的回调
:param finished_times: 完成次数
:param use_power: 使用的体力
识别开拓力和沉浸器
:return:
"""
log.info('挑战成功 完成次数 %d 使用体力 %d', finished_times, use_power)
self.power -= use_power
plan: Optional[TrailblazePowerPlanItem] = self.ctx.tp_config.next_plan_item
plan['run_times'] += finished_times
self.ctx.tp_config.save()

def _check_power_for_sim_uni(self) -> OperationOneRoundResult:
if self.qty is not None:
return self.round_success()

ops = [
OpenPhoneMenu(self.ctx),
ClickPhoneMenuItem(self.ctx, phone_menu_const.INTERASTRAL_GUIDE),
ChooseGuideTab(self.ctx, GuideTabEnum.TAB_3.value)
ChooseGuideTab(self.ctx, GuideTabEnum.TAB_2.value)
]

for op in ops:
Expand All @@ -173,17 +104,18 @@ def _check_power_for_sim_uni(self) -> OperationOneRoundResult:
return self.round_by_op(op_result)

screen = self.screenshot()
x, y = self._get_sim_uni_power_and_qty(screen)
x, y = self._get_power_and_qty(screen)

if x is None or y is None:
return self.round_retry('检测开拓力和沉浸器数量失败', wait=1)

log.info('检测当前体力 %d 沉浸器数量 %d', x, y)
self.power = x
self.qty = y

return self.round_success()

def _get_sim_uni_power_and_qty(self, screen: MatLike) -> Tuple[int, int]:
def _get_power_and_qty(self, screen: MatLike) -> Tuple[int, int]:
"""
获取开拓力和沉浸器数量
:param screen: 屏幕截图
Expand All @@ -199,27 +131,55 @@ def _get_sim_uni_power_and_qty(self, screen: MatLike) -> Tuple[int, int]:

return power, qty

def _challenge_sim_uni(self) -> OperationOneRoundResult:
def _execute_plan(self) -> OperationOneRoundResult:
plan: Optional[TrailblazePowerPlanItem] = self.ctx.tp_config.next_plan_item
point: Optional[SurvivalIndexMission] = SurvivalIndexMissionEnum.get_by_unique_id(plan['mission_id'])
run_times: int = self.power // point.power + self.qty
if run_times == 0:
mission: Optional[SurvivalIndexMission] = SurvivalIndexMissionEnum.get_by_unique_id(plan['mission_id'])
can_run_times: int = self.power // mission.power
if isinstance(mission.tp, (SimUniWorld, OrnamentExtraction)): # 模拟宇宙相关的增加沉浸器数量
can_run_times += self.qty
if can_run_times == 0:
return self.round_success(TrailblazePower.STATUS_NO_ENOUGH_POWER)
if run_times + plan['run_times'] > plan['plan_times']:

if can_run_times + plan['run_times'] > plan['plan_times']:
run_times = plan['plan_times'] - plan['run_times']
else:
run_times = can_run_times
if run_times == 0:
return self.round_success(TrailblazePower.STATUS_PLAN_FINISHED)

self.ctx.sim_uni_run_record.check_and_update_status()
op = SimUniApp(self.ctx,
specified_uni_num=point.tp.idx,
max_reward_to_get=run_times,
get_reward_callback=self._on_sim_uni_get_reward
)
op.init_context_before_start = False
op.stop_context_after_stop = False

if isinstance(mission.tp, TransportPoint):
op = UseTrailblazePower(self.ctx, mission, plan['team_num'], run_times,
support=plan['support'] if plan['support'] != 'none' else None,
on_battle_success=self._on_normal_task_success,
need_transport=mission != self.last_mission)
elif isinstance(mission.tp, SimUniWorld):
op = SimUniApp(self.ctx,
specified_uni_num=mission.tp.idx,
max_reward_to_get=can_run_times,
get_reward_callback=self._on_sim_uni_get_reward
)
op.init_context_before_start = False
op.stop_context_after_stop = False
elif isinstance(mission.tp, OrnamentExtraction):
pass
self.last_mission = mission
return self.round_by_op(op.execute())

def _on_normal_task_success(self, finished_times: int, use_power: int):
"""
普通副本获取一次奖励时候的回调
:param finished_times: 完成次数
:param use_power: 使用的体力
:return:
"""
log.info('挑战成功 完成次数 %d 使用体力 %d', finished_times, use_power)
self.power -= use_power
plan: Optional[TrailblazePowerPlanItem] = self.ctx.tp_config.next_plan_item
plan['run_times'] += finished_times
self.ctx.tp_config.save()

def _on_sim_uni_get_reward(self, use_power: int, user_qty: int):
"""
模拟宇宙 获取沉浸奖励后的回调
Expand Down
51 changes: 51 additions & 0 deletions src/sr/interastral_peace_guide/ornamenet_extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from typing import List

from sr.const import phone_menu_const
from sr.context import Context
from sr.image.sceenshot import screen_state
from sr.image.sceenshot.screen_state_enum import ScreenState
from sr.interastral_peace_guide.survival_index_mission import SurvivalIndexCategoryEnum
from sr.operation import StateOperation, StateOperationEdge, OperationOneRoundResult, StateOperationNode
from sr.operation.unit.guide import GuideTabEnum
from sr.operation.unit.guide.choose_guide_tab import ChooseGuideTab
from sr.operation.unit.guide.survival_index import SurvivalIndexChooseCategory
from sr.operation.unit.menu.click_phone_menu_item import ClickPhoneMenuItem
from sr.operation.unit.menu.open_phone_menu import OpenPhoneMenu
from sr.screen_area.screen_normal_world import ScreenNormalWorld


class ChallengeOrnamentExtraction(StateOperation):

def __init__(self, ctx: Context):
edges: List[StateOperationEdge] = []

_check_screen = StateOperationNode('识别画面', self.check_screen)

_open_menu = StateOperationNode('打开菜单', op=OpenPhoneMenu(ctx))
edges.append(StateOperationEdge(_check_screen, _open_menu, status=ScreenNormalWorld.CHARACTER_ICON.value.status))

_open_guide = StateOperationNode('打开指南', op=ClickPhoneMenuItem(ctx, phone_menu_const.INTERASTRAL_GUIDE))
edges.append(StateOperationEdge(_open_menu, _open_guide))

_choose_survival_index = StateOperationNode('选择生存索引', op=ChooseGuideTab(ctx, GuideTabEnum.TAB_2.value))
edges.append(StateOperationEdge(_open_guide, _choose_survival_index))
edges.append(StateOperationEdge(_check_screen, _choose_survival_index, status=ScreenState.GUIDE.value.status))

_choose_oe = StateOperationNode('选择饰品提取', op=SurvivalIndexChooseCategory(ctx, SurvivalIndexCategoryEnum.ORNAMENT_EXTRACTION.value, skip_wait=True))
edges.append(StateOperationEdge(_choose_survival_index, _choose_oe))

_choose_diff = StateOperationNode('选择难度', )

def check_screen(self) -> OperationOneRoundResult:
"""
识别当前画面
:return:
"""
screen = self.screenshot()
if screen_state.is_normal_in_world(screen, self.ctx.im):
return self.round_success(ScreenNormalWorld.CHARACTER_ICON.value.status)

if screen_state.in_secondary_ui(screen, self.ctx.ocr, ScreenState.GUIDE.value):
return self.round_success(ScreenState.GUIDE.value.status)

return self.round_success()
17 changes: 3 additions & 14 deletions src/sr/interastral_peace_guide/survival_index_mission.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from sr.image.sceenshot.screen_state_enum import ScreenState
from sr.screen_area import ScreenArea
from sr.screen_area.interastral_peace_guide import ScreenGuide
from sr.sim_uni.sim_uni_const import SimUniWorld, SimUniWorldEnum, OrnamentExtractionEnum
from sr.sim_uni.sim_uni_const import SimUniWorld, SimUniWorldEnum, OrnamentExtractionEnum, OrnamentExtraction


class SurvivalIndexCategory:
Expand Down Expand Up @@ -66,25 +66,14 @@ class SurvivalIndexSubCategoryEnum(Enum):
class SurvivalIndexMission:

def __init__(self, cate: SurvivalIndexCategory,
tp: Union[TransportPoint, SimUniWorld],
tp: Union[TransportPoint, SimUniWorld, OrnamentExtraction],
power: int,
sub_cate: Optional[SurvivalIndexSubCategory] = None):
self.cate: SurvivalIndexCategory = cate
self.sub_cate: Optional[SurvivalIndexSubCategory] = sub_cate
self.tp: Union[TransportPoint, SimUniWorld] = tp
self.tp: Union[TransportPoint, SimUniWorld, OrnamentExtraction] = tp
self.power: int = power

@property
def survival_index_cn(self) -> str:
"""
在生存索引中显示的中文名称
:return:
"""
if self.sub_cate is not None:
return self.tp.cn[:4] + '·' + self.sub_cate.area.text
else:
return self.tp.cn

@property
def ui_cn(self) -> str:
"""
Expand Down
Loading

0 comments on commit 9f62428

Please sign in to comment.