Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Allow arbitrary RCon commands in chat commands #678

Merged
merged 17 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions rcon/api_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from rcon.user_config.log_line_webhooks import LogLineWebhookUserConfig
from rcon.user_config.log_stream import LogStreamUserConfig
from rcon.user_config.name_kicks import NameKickUserConfig
from rcon.user_config.rcon_chat_commands import RConChatCommandsUserConfig
from rcon.user_config.rcon_connection_settings import RconConnectionSettingsUserConfig
from rcon.user_config.rcon_server_settings import RconServerSettingsUserConfig
from rcon.user_config.real_vip import RealVipUserConfig
Expand Down Expand Up @@ -81,20 +82,23 @@
def parameter_aliases(alias_to_param: Dict[str, str]):
"""Specify parameter aliases of a function. This might be useful to preserve backwards
compatibility or to handle parameters named after a Python reserved keyword.

Takes a mapping of aliases to their parameter name."""

def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for alias, param in alias_to_param.items():
if alias in kwargs:
kwargs[param] = kwargs.pop(alias)
return func(*args, **kwargs)

wrapper._parameter_aliases = alias_to_param
return wrapper

return decorator


def get_rcon_api(credentials: ServerInfoType | None = None) -> "RconAPI":
"""Return a initialized Rcon connection to the game server

Expand Down Expand Up @@ -578,9 +582,11 @@ def get_online_mods(self) -> list[AdminUserType]:
def get_ingame_mods(self) -> list[AdminUserType]:
return ingame_mods()

@parameter_aliases({
"from": "from_",
})
@parameter_aliases(
{
"from": "from_",
}
)
def get_historical_logs(
self,
player_name: str | None = None,
Expand Down Expand Up @@ -1710,6 +1716,41 @@ def validate_chat_commands_config(
reset_to_default=reset_to_default,
)

def get_rcon_chat_commands_config(self):
return RConChatCommandsUserConfig.load_from_db()

def set_rcon_chat_commands_config(
self,
by: str,
config: dict[str, Any] | BaseUserConfig | None = None,
reset_to_default: bool = False,
**kwargs,
) -> bool:
return self._validate_user_config(
command_name=inspect.currentframe().f_code.co_name, # type: ignore
by=by,
model=RConChatCommandsUserConfig,
data=config or kwargs,
dry_run=False,
reset_to_default=reset_to_default,
)

def validate_rcon_chat_commands_config(
self,
by: str,
config: dict[str, Any] | BaseUserConfig | None = None,
reset_to_default: bool = False,
**kwargs,
) -> bool:
return self._validate_user_config(
command_name=inspect.currentframe().f_code.co_name, # type: ignore
by=by,
model=RConChatCommandsUserConfig,
data=config or kwargs,
dry_run=True,
reset_to_default=reset_to_default,
)

def get_log_stream_config(self):
return LogStreamUserConfig.load_from_db()

Expand Down
2 changes: 1 addition & 1 deletion rcon/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def set_registered_mods(moderators_name_steamids: List[tuple]):
red.hset("moderators", k, v)


def online_mods(rcon=None) -> list[AdminUserType]:
def online_mods() -> list[AdminUserType]:
red = _red()
return [
json.loads(red.get(u)) for u in red.scan_iter(f"{HEARTBEAT_KEY_PREFIX}*", 1)
Expand Down
2 changes: 1 addition & 1 deletion rcon/auto_kick.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

from pydantic import HttpUrl

from rcon.api_commands import RconAPI
from rcon.discord import send_to_discord_audit
from rcon.game_logs import on_connected
from rcon.hooks import inject_player_ids
from rcon.player_history import get_player_profile, player_has_flag
from rcon.user_config.name_kicks import NameKickUserConfig
from rcon.api_commands import RconAPI

logger = logging.getLogger(__name__)

Expand Down
222 changes: 3 additions & 219 deletions rcon/auto_settings.py
Original file line number Diff line number Diff line change
@@ -1,231 +1,15 @@
import logging
import re
import time
from datetime import datetime

import pytz

from rcon.api_commands import get_rcon_api
from rcon.audit import ingame_mods, online_mods
from rcon.commands import BrokenHllConnection, CommandFailedError
from rcon.conditions import Condition, create_condition
from rcon.rcon import do_run_commands
from rcon.user_config.auto_settings import AutoSettingsConfig
from rcon.user_config.utils import BaseUserConfig


logger = logging.getLogger(__name__)

USER_CONFIG_NAME_PATTERN = re.compile(r"set_.*_config")


def _get_current_map_metric(rcon):
try:
rcon.current_map = str(rcon.get_map())
except (CommandFailedError, BrokenHllConnection):
logger.exception("Failed to get current map")
return str(rcon.current_map)


METRICS = {
"player_count": lambda rcon: int(rcon.get_slots()["current_players"]),
"online_mods": lambda: len(online_mods()),
"ingame_mods": lambda: len(ingame_mods()),
"current_map": _get_current_map_metric,
"time_of_day": lambda tz: datetime.now(tz=tz),
}


def is_user_config_func(name: str) -> bool:
return re.match(USER_CONFIG_NAME_PATTERN, name) is not None


class BaseCondition:
def __init__(self, min=0, max=100, inverse=False, *args, **kwargs):
self.min = int(min)
self.max = int(max)
self.inverse = bool(inverse)

self.metric_name = ""
self.metric_source = "rcon"

@property
def metric_getter(self):
try:
return METRICS[self.metric_name]
except:
return None

def is_valid(self, **metric_sources):
metric_source = metric_sources[self.metric_source]
comparand = self.metric_getter(metric_source)
res = self.min <= comparand <= self.max
logger.info(
"Applying condition %s: %s <= %s <= %s = %s. Inverse: %s",
self.metric_name,
self.min,
comparand,
self.max,
res,
self.inverse,
)
if self.inverse:
return not res
else:
return res


class PlayerCountCondition(BaseCondition):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metric_name = "player_count"


class OnlineModsCondition(BaseCondition):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metric_name = "online_mods"
self.metric_source = None

def is_valid(self, **metric_sources):
comparand = self.metric_getter()
res = self.min <= comparand <= self.max
logger.info(
"Applying condition %s: %s <= %s <= %s = %s. Inverse: %s",
self.metric_name,
self.min,
comparand,
self.max,
res,
self.inverse,
)
if self.inverse:
return not res
else:
return res


class IngameModsCondition(OnlineModsCondition):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metric_name = "ingame_mods"
self.metric_source = None


class CurrentMapCondition(BaseCondition):
def __init__(
self, map_names=[], inverse=False, *args, **kwargs
): # Avoid unexpected arguments
self.map_names = map_names
self.inverse = inverse
self.metric_name = "current_map"
self.metric_source = "rcon"

def is_valid(self, **metric_sources):
metric_source = metric_sources[self.metric_source]
comparand = self.metric_getter(metric_source)
res = comparand in self.map_names
logger.info(
"Applying condition %s: %s in %s = %s. Inverse: %s",
self.metric_name,
comparand,
self.map_names,
res,
self.inverse,
)
if self.inverse:
return not res
else:
return res


class TimeOfDayCondition(BaseCondition):
def __init__(
self, min="00:00", max="23:59", timezone="utc", inverse=False, *args, **kwargs
): # Avoid unexpected arguments
self.min = str(min)
self.max = str(max)
if self.max in ["24:00", "0:00"]:
self.max = "23:59"
self.inverse = bool(inverse)
if timezone.lower() == "utc":
self.tz = pytz.UTC
else:
self.tz = pytz.timezone(timezone)
self.metric_name = "time_of_day"
self.metric_source = None

def is_valid(self, **metric_sources):
try:
min_h, min_m = [int(i) for i in self.min.split(":")[:2]]
max_h, max_m = [int(i) for i in self.max.split(":")[:2]]
min = datetime.now(tz=self.tz).replace(hour=min_h, minute=min_m)
max = datetime.now(tz=self.tz).replace(hour=max_h, minute=max_m)
except:
logger.exception("Time Of Day condition is invalid and is ignored")
return False # The condition should fail
comparand = datetime.now(tz=self.tz)
res = min <= comparand <= max
logger.info(
"Applying condition %s: %s <= %s:%s <= %s = %s. Inverse: %s",
self.metric_name,
self.min,
comparand.hour,
comparand.minute,
self.max,
res,
self.inverse,
)
if self.inverse:
return not res
else:
return res


def create_condition(name, **kwargs):
kwargs["inverse"] = kwargs.get("not", False) # Using "not" would cause issues later
if name == "player_count":
return PlayerCountCondition(**kwargs)
elif name == "online_mods":
return OnlineModsCondition(**kwargs)
elif name == "ingame_mods":
return IngameModsCondition(**kwargs)
elif name == "current_map":
return CurrentMapCondition(**kwargs)
elif name == "time_of_day":
return TimeOfDayCondition(**kwargs)
else:
raise ValueError("Invalid condition type: %s" % name)


def do_run_commands(rcon, commands):
for command, params in commands.items():
try:
logger.info("Applying %s %s", command, params)

# Allow people to apply partial changes to a user config to make
# auto settings less gigantic
if is_user_config_func(command):
# super dirty we should probably make an actual look up table
# but all the names are consistent
get_config_command = f"g{command[1:]}"
config: BaseUserConfig = rcon.__getattribute__(get_config_command)()
# get the existing config, override anything set in params
merged_params = config.model_dump() | params

if "by" not in merged_params:
merged_params["by"] = "AutoSettings"

rcon.__getattribute__(command)(**merged_params)
else:
# Non user config settings
rcon.__getattribute__(command)(**params)
except AttributeError as e:
logger.exception(
"%s is not a valid command, double check the name!", command
)
except Exception as e:
logger.exception("Unable to apply %s %s: %s", command, params, e)
time.sleep(5) # go easy on the server


def run():
rcon = get_rcon_api()
Expand Down Expand Up @@ -254,7 +38,7 @@ def run():
)

for rule in config["rules"]:
conditions: list[BaseCondition] = []
conditions: list[Condition] = []
commands = rule.get("commands", {})
for c_name, c_params in rule.get("conditions", {}).items():
try:
Expand Down
1 change: 0 additions & 1 deletion rcon/automods/level_thresholds.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from rcon.types import GameState, GetDetailedPlayer
from rcon.user_config.auto_mod_level import AutoModLevelUserConfig, Roles


LEVEL_THRESHOLDS_RESET_SECS = 120
AUTOMOD_USERNAME = "LevelThresholdsAutomod"

Expand Down
1 change: 0 additions & 1 deletion rcon/automods/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from pydantic import HttpUrl
from pydantic.dataclasses import dataclass


logger = logging.getLogger(__name__)


Expand Down
1 change: 0 additions & 1 deletion rcon/automods/no_leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from rcon.types import GameState
from rcon.user_config.auto_mod_no_leader import AutoModNoLeaderUserConfig


LEADER_WATCH_RESET_SECS = 120
AUTOMOD_USERNAME = "NoLeaderWatch"

Expand Down
1 change: 0 additions & 1 deletion rcon/automods/no_solotank.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from rcon.types import GameState
from rcon.user_config.auto_mod_solo_tank import AutoModNoSoloTankUserConfig


SOLO_TANK_RESET_SECS = 120
AUTOMOD_USERNAME = "NoSoloTank"

Expand Down
1 change: 0 additions & 1 deletion rcon/automods/seeding_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from rcon.types import GameState, GetDetailedPlayer, Roles
from rcon.user_config.auto_mod_seeding import AutoModSeedingUserConfig


SEEDING_RULES_RESET_SECS = 120
AUTOMOD_USERNAME = "SeedingRulesAutomod"
SEEDING_RULE_NAMES = ["disallowed_roles", "disallowed_weapons", "enforce_cap_fight"]
Expand Down
Loading