diff --git a/.env.example b/.env.example index e2848cf..80a553c 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,5 @@ AICOACH_MONGO_DSN= AICOACH_ASSISTANT_ID= AICOACH_OPENAI_API_KEY= -AICOACH_OPENAI_ORG_ID= \ No newline at end of file +AICOACH_OPENAI_ORG_ID= +AICOACH_OBS_WS_PW= \ No newline at end of file diff --git a/Installation.md b/Installation.md index 19f2d7e..5109bcd 100644 --- a/Installation.md +++ b/Installation.md @@ -3,7 +3,7 @@ Notes on how to setup dependencies; In general, create a new env with conda: ```sh -conda env create --name aicoach311 --file=environments-cp311.yml +conda env create --file=environments-cp311.yml ``` Python 3.11 is the only version that works with all dependencies at this point. @@ -19,33 +19,24 @@ import openwakeword openwakeword.utils.download_models() ``` -## Flash attention - -https://pypi.org/project/flash-attn/ - -Set MAX_JOBS=4 if less than 100Gb of RAM - ## pytorch with CUDA Needs a CUDA capabale NVidia GPU to run fast whisper. conda install pytorch torchvision torchaudio pytorch-cuda=12.1 -c pytorch -c nvidia -## tesseract - -Install tesseract with language data. (Windows: https://github.com/UB-Mannheim/tesseract/wiki). -If installed to non-default location adjust tessdata_dir in config. - -## RealtimeTTS - -https://github.com/KoljaB/RealtimeTTS?tab=readme-ov-file - -Install manually then reinstall openai as RealtimeTTS downgrades openai on installation. +## Flash attention -After installation, remove TTS (not maintained anymore) +https://pypi.org/project/flash-attn/ -`pip uninstall TTS` +Notes: +- Get C++ build tools: https://visualstudio.microsoft.com/visual-cpp-build-tools/ + - MSVC C++ 2022 build tools latest + - Windows 11 SDK +- Get ninja: ```pip install ninja``` +- Set MAX_JOBS=4 if less than 100Gb of RAM -and instead install coqui-tts +## tesseract -`pip install coqui-tts` +Install tesseract with language data. (Windows: https://github.com/UB-Mannheim/tesseract/wiki). +If installed to non-default location adjust tessdata_dir in config. diff --git a/config.py b/config.py index 0be0bbd..3ef8d60 100644 --- a/config.py +++ b/config.py @@ -99,6 +99,7 @@ class Config(BaseSettings): sc2_client_url: str = "http://127.0.0.1:6119" screenshot: str tessdata_dir: str + obs_ws_pw: str | None season: int diff --git a/environment-cp311.yml b/environment-cp311.yml index 4f934f5..e882f93 100644 --- a/environment-cp311.yml +++ b/environment-cp311.yml @@ -42,5 +42,6 @@ dependencies: - SpeechRecognition #- flash_attn # https://pypi.org/project/flash-attn/ - pyodmongo - #- RealtimeTTS # install manually as it downgrades openai + - realtimetts[system,coqui] - keyboard + - obsws-python diff --git a/obs_client.py b/obs_client.py new file mode 100644 index 0000000..3999fca --- /dev/null +++ b/obs_client.py @@ -0,0 +1,80 @@ +from time import sleep + +import click +import obsws_python as obsws +from rich import print + +from config import config +from obs_tools.sc2client import sc2client +from obs_tools.types import Screen + + +# we set this up as a standalone process so that OBS can run and react to SC2 UI changes without the need +# to run the rest of the project. +# This will send the currently visible screen(s) in SC2 menus to OBS via the AdvancedSceneSwitcher plugin +# If ingame, it will send "In game" to OBS +# AdvancedSceneSwitcher can use these messages in macro conditions +@click.command() +@click.option("--verbose", is_flag=True) +def main(verbose): + """Monitor SC2 UI through client API and let OBS know when loading screen is active""" + + menu_screens = set([Screen.background, Screen.foreground, Screen.navigation]) + + with obsws.ReqClient( + host="localhost", port=4455, password=config.obs_ws_pw, timeout=3 + ) as obs: + resp = obs.get_version() + print(f"OBS Version: {resp.obs_version}") + + last_ui = None + while True: + ui = sc2client.get_uiinfo() + + if ui is None: + print(":warning: SC2 not running?") + sleep(5) + continue + + if ui == last_ui: + # only notify OBS on changes + sleep(0.5) + continue + + if verbose: + print(ui.activeScreens) + + if len(ui.activeScreens) == 0: + print("In game") + data = {"message": "In game"} + obs.call_vendor_request( + vendor_name="AdvancedSceneSwitcher", + request_type="AdvancedSceneSwitcherMessage", + request_data=data, + ) + elif Screen.loading in ui.activeScreens: + print(Screen.loading) + + data = {"message": Screen.loading} + obs.call_vendor_request( + vendor_name="AdvancedSceneSwitcher", + request_type="AdvancedSceneSwitcherMessage", + request_data=data, + ) + elif menu_screens < ui.activeScreens: + menues = ui.activeScreens - menu_screens + print("In menues " + str(menues)) + data = {"message": "\n".join(sorted(menues))} + obs.call_vendor_request( + vendor_name="AdvancedSceneSwitcher", + request_type="AdvancedSceneSwitcherMessage", + request_data=data, + ) + else: + pass + + last_ui = ui + + +if __name__ == "__main__": + main() diff --git a/obs_tools/sc2client.py b/obs_tools/sc2client.py index 54d8da6..02fe017 100644 --- a/obs_tools/sc2client.py +++ b/obs_tools/sc2client.py @@ -1,6 +1,7 @@ import logging import threading from time import sleep, time +from urllib.parse import urljoin import requests from blinker import signal @@ -9,7 +10,7 @@ from config import config -from .types import GameInfo, Result, ScanResult +from .types import GameInfo, Result, ScanResult, Screen, UIInfo log = logging.getLogger(f"{config.name}.{__name__}") @@ -24,15 +25,20 @@ class SC2Client: def get_gameinfo(self) -> GameInfo: try: - response = requests.get(config.sc2_client_url + "/game") - if response.status_code == 200: - try: - game = GameInfo.model_validate_json(response.text) - return game - except ValidationError as e: - log.warn(f"Invalid game data: {e}") - except ConnectionError as e: - log.warn("Could not connect to SC2 game client, is SC2 running?") + game = self._get_info("/game") + gameinfo = GameInfo.model_validate_json(game) + return gameinfo + except ValidationError as e: + log.warn(f"Invalid game data: {e}") + return None + + def get_uiinfo(self) -> UIInfo: + try: + ui = self._get_info("/ui") + uiinfo = UIInfo.model_validate_json(ui) + return uiinfo + except ValidationError as e: + log.warn(f"Invalid UI data: {e}") return None def get_opponent_name(self, gameinfo=None) -> str: @@ -44,12 +50,20 @@ def get_opponent_name(self, gameinfo=None) -> str: return player.name return None + def _get_info(self, path) -> str: + try: + response = requests.get(urljoin(config.sc2_client_url, path)) + if response.status_code == 200: + return response.text + except ConnectionError as e: + log.warn("Could not connect to SC2 game client, is SC2 running?") + return None + def wait_for_gameinfo( self, timeout: int = 20, delay: float = 0.5, ongoing=False ) -> GameInfo: start_time = time() while time() - start_time < timeout: - gameinfo = self.get_gameinfo() if ongoing: gameinfo = self.get_ongoing_gameinfo() else: @@ -102,7 +116,7 @@ def scan_client_api(self): gameinfo = sc2client.get_ongoing_gameinfo() - if self.is_live_game(gameinfo): + if is_live_game(gameinfo): if gameinfo == self.last_gameinfo: # same ongoing game, just later in time if gameinfo.displayTime >= self.last_gameinfo.displayTime: @@ -117,13 +131,14 @@ def scan_client_api(self): loading_screen.send(self, scanresult=scanresult) sleep(1) - def is_live_game(self, gameinfo): - return ( - gameinfo - and gameinfo.displayTime > 0 - and gameinfo.players[0].result == Result.undecided - and not gameinfo.isReplay - ) + +def is_live_game(gameinfo: GameInfo) -> bool: + return ( + gameinfo + and gameinfo.displayTime > 0 + and gameinfo.players[0].result == Result.undecided + and not gameinfo.isReplay + ) if __name__ == "__main__": diff --git a/obs_tools/types.py b/obs_tools/types.py index f048d66..174c3f1 100644 --- a/obs_tools/types.py +++ b/obs_tools/types.py @@ -4,6 +4,23 @@ from pydantic import BaseModel +class Screen(str, Enum): + loading = "ScreenLoading/ScreenLoading" + score = "ScreenScore/ScreenScore" + home = "ScreenHome/ScreenHome" + background = "ScreenBackgroundSC2/ScreenBackgroundSC2" + foreground = "ScreenForegroundSC2/ScreenForegroundSC2" + navigation = "ScreenNavigationSC2/ScreenNavigationSC2" + userprofile = "ScreenUserProfile/ScreenUserProfile" + multiplayer = "ScreenMultiplayer/ScreenMultiplayer" + single = "ScreenSingle/ScreenSingle" + collection = "ScreenCollection/ScreenCollection" + coopcampaign = "ScreenCoopCampaign/ScreenCoopCampaign" + custom = "ScreenCustom/ScreenCustom" + replay = "ScreenReplay/ScreenReplay" + battlelobby = "ScreenBattleLobby/ScreenBattleLobby" + + class ScanResult(BaseModel): mapname: str opponent: str @@ -82,3 +99,7 @@ def is_decided(self) -> bool: and len(self.players) > 0 and all(player.result != Result.undecided for player in self.players) ) + + +class UIInfo(BaseModel): + activeScreens: set[Screen] diff --git a/replays/sc2readerplugins/statistics.py b/replays/sc2readerplugins/statistics.py index 8c2a477..cc0384e 100644 --- a/replays/sc2readerplugins/statistics.py +++ b/replays/sc2readerplugins/statistics.py @@ -27,7 +27,8 @@ def loserDoesGG(replay): loser_sids = [p.sid for p in replay.players if p.result == "Loss"] loser_messages = [m for m in replay.messages if m.pid in loser_sids] return any( - levenshtein(m.text.lower(), g) < 2 and m.text.lower() != "bg" + set((m.text.lower())) - set("g") == set() + or (levenshtein(m.text.lower(), g) < 2 and m.text.lower() != "bg") for g in GGS for m in loser_messages ) diff --git a/tests/integration/test_obs_tools.py b/tests/integration/test_obs_tools.py index fc58e26..0cce3a3 100644 --- a/tests/integration/test_obs_tools.py +++ b/tests/integration/test_obs_tools.py @@ -12,9 +12,7 @@ # SC2 must be running and a game must be in progress or have been played recently -# or use https://github.com/leigholiver/sc2apiemulator -# SC2 must not be running: -# docker run -d --rm -p6119:80 --name sc2api leigholiver/sc2api +# or use https://github.com/manuelseeger/sc2apiemulator def test_sc2client_get_opponent(): client = SC2Client() diff --git a/tests/unit/test_sc2client.py b/tests/unit/test_sc2client.py new file mode 100644 index 0000000..b53f2e5 --- /dev/null +++ b/tests/unit/test_sc2client.py @@ -0,0 +1,29 @@ +from obs_tools.types import Screen, UIInfo + + +def test_uiinfo_equality_loading(): + ui1 = UIInfo(activeScreens=[Screen.loading]) + ui2 = UIInfo(activeScreens=[Screen.loading]) + + assert ui1 == ui2 + + +def test_uiinfo_equality_menus(): + ui1 = UIInfo( + activeScreens=[ + Screen.background, + Screen.foreground, + Screen.navigation, + Screen.home, + ] + ) + ui2 = UIInfo( + activeScreens=[ + Screen.home, + Screen.background, + Screen.foreground, + Screen.navigation, + ] + ) + + assert ui1 == ui2