diff --git a/discord/bot.py b/discord/bot.py index 18683fa860..26e501e4af 100644 --- a/discord/bot.py +++ b/discord/bot.py @@ -29,6 +29,7 @@ import collections import copy import inspect +import logging import sys import traceback from abc import ABC, abstractmethod @@ -43,6 +44,7 @@ Type, TypeVar, Union, + Literal, ) from .client import Client @@ -74,6 +76,8 @@ "AutoShardedBot", ) +_log = logging.getLogger(__name__) + class ApplicationCommandMixin(ABC): """A mixin that implements common functionality for classes that need @@ -208,7 +212,7 @@ def get_application_command( return return command - async def get_desynced_commands(self, guild_id: Optional[int] = None) -> List[Dict[str, Any]]: + async def get_desynced_commands(self, guild_id: Optional[int] = None, prefetched=None) -> List[Dict[str, Any]]: """|coro| Gets the list of commands that are desynced from discord. If ``guild_id`` is specified, it will only return @@ -225,6 +229,8 @@ async def get_desynced_commands(self, guild_id: Optional[int] = None) -> List[Di ---------- guild_id: Optional[:class:`int`] The guild id to get the desynced commands for, else global commands if unspecified. + prefetched + If you already fetched the commands, you can pass them here to be used. Not recommended for typical usage. Returns ------- @@ -233,68 +239,97 @@ async def get_desynced_commands(self, guild_id: Optional[int] = None) -> List[Di respectively contain the command and the action to perform. Other keys may also be present depending on the action, including ``id``. """ + # We can suggest the user to upsert, edit, delete, or bulk upsert the commands + def _check_command(cmd: ApplicationCommand, match: Dict) -> bool: + if isinstance(cmd, SlashCommandGroup): + if len(cmd.subcommands) != len(match.get("options", [])): + return True + for i, subcommand in enumerate(cmd.subcommands): + match_ = next( + ( + data + for data in match["options"] + if data["name"] == subcommand.name + ), + MISSING, + ) + if match_ is not MISSING and _check_command(subcommand, match_): + return True + else: + as_dict = cmd.to_dict() + to_check = { + "default_permission": None, + "name": None, + "description": None, + "name_localizations": None, + "description_localizations": None, + "options": ["type", "name", "description", "autocomplete", "choices", "name_localizations", + "description_localizations"], + } + for check, value in to_check.items(): + if type(to_check[check]) == list: + # We need to do some falsy conversion here + # The API considers False (autocomplete) and [] (choices) to be falsy values + falsy_vals = (False, []) + for opt in value: + cmd_vals = ( + [val.get(opt, MISSING) for val in as_dict[check]] + if check in as_dict + else [] + ) + for i, val in enumerate(cmd_vals): + if val in falsy_vals: + cmd_vals[i] = MISSING + if match.get(check, MISSING) is not MISSING and cmd_vals != [ + val.get(opt, MISSING) for val in match[check] + ]: + # We have a difference + return True + elif getattr(cmd, check) != match.get(check): + # We have a difference + if check == "default_permission" and getattr(cmd, check) is True and match.get(check) is None: + # This is a special case + # TODO: Remove for perms v2 + continue + return True + return False + return_value = [] cmds = self.pending_application_commands.copy() if guild_id is None: - registered_commands = await self._bot.http.get_global_commands(self._bot.user.id) + if prefetched is not None: + registered_commands = prefetched + else: + registered_commands = await self._bot.http.get_global_commands(self.user.id) pending = [cmd for cmd in cmds if cmd.guild_ids is None] else: - registered_commands = await self._bot.http.get_guild_commands(self._bot.user.id, guild_id) + if prefetched is not None: + registered_commands = prefetched + else: + registered_commands = await self._bot.http.get_guild_commands(self.user.id, guild_id) pending = [cmd for cmd in cmds if cmd.guild_ids is not None and guild_id in cmd.guild_ids] registered_commands_dict = {cmd["name"]: cmd for cmd in registered_commands} - to_check = { - "default_permission": None, - "name": None, - "description": None, - "options": ["type", "name", "description", "autocomplete", "choices"], - } # First let's check if the commands we have locally are the same as the ones on discord for cmd in pending: match = registered_commands_dict.get(cmd.name) if match is None: # We don't have this command registered return_value.append({"command": cmd, "action": "upsert"}) - continue - - as_dict = cmd.to_dict() - - for check, value in to_check.items(): - if type(to_check[check]) == list: - # We need to do some falsy conversion here - # The API considers False (autocomplete) and [] (choices) to be falsy values - falsy_vals = (False, []) - for opt in value: - - cmd_vals = [val.get(opt, MISSING) for val in as_dict[check]] if check in as_dict else [] - for i, val in enumerate(cmd_vals): - if val in falsy_vals: - cmd_vals[i] = MISSING - if match.get(check, MISSING) is not MISSING and cmd_vals != [ - val.get(opt, MISSING) for val in match[check] - ]: - # We have a difference - return_value.append( - { - "command": cmd, - "action": "edit", - "id": int(registered_commands_dict[cmd.name]["id"]), - } - ) - break - elif getattr(cmd, check) != match[check]: - # We have a difference - return_value.append( - { - "command": cmd, - "action": "edit", - "id": int(registered_commands_dict[cmd.name]["id"]), - } - ) - break + elif _check_command(cmd, match): + return_value.append( + { + "command": cmd, + "action": "edit", + "id": int(registered_commands_dict[cmd.name]["id"]), + } + ) + else: + # We have this command registered but it's the same + return_value.append({"command": cmd, "action": None, "id": int(match["id"])}) # Now let's see if there are any commands on discord that we need to delete for cmd, value_ in registered_commands_dict.items(): @@ -344,10 +379,12 @@ async def register_command( raise NotImplementedError("This function has not been implemented yet") async def register_commands( - self, - commands: Optional[List[ApplicationCommand]] = None, - guild_id: Optional[int] = None, - force: bool = True, + self, + commands: Optional[List[ApplicationCommand]] = None, + guild_id: Optional[int] = None, + method: Literal["individual", "bulk", "auto"] = "bulk", + force: bool = False, + delete_existing: bool = True, ) -> List[interactions.ApplicationCommand]: """|coro| @@ -362,11 +399,15 @@ async def register_commands( guild_id: Optional[int] If this is set, the commands will be registered as a guild command for the respective guild. If it is not set, the commands will be registered according to their :attr:`ApplicationCommand.guild_ids` attribute. + method: Literal['individual', 'bulk', 'auto'] + The method to use when registering the commands. If this is set to "individual", then each command will be + registered individually. If this is set to "bulk", then all commands will be registered in bulk. If this is + set to "auto", then the method will be determined automatically. Defaults to "bulk". force: :class:`bool` - Registers the commands regardless of the state of the command on Discord. This can sometimes cause commands - to be re-registered without changes (The command can temporarily appear as an invalid command on the user's - side) due to a bug in the API, but is a more foolproof method of registering - commands. Defaults to True. + Registers the commands regardless of the state of the command on Discord. This uses one less API call, but + can result in hitting rate limits more often. Defaults to False. + delete_existing: :class:`bool` + Whether to delete existing commands that are not in the list of commands to register. Defaults to True. """ if commands is None: commands = self.pending_application_commands @@ -391,7 +432,7 @@ async def register_commands( "edit": self._bot.http.edit_global_command, } - def register(method: str, *args, **kwargs): + def _register(method: Literal["bulk", "upsert", "delete", "edit"], *args, **kwargs): return registration_methods[method](self._bot.user.id, *args, **kwargs) else: @@ -408,26 +449,42 @@ def register(method: str, *args, **kwargs): "edit": self._bot.http.edit_guild_command, } - def register(method: str, *args, **kwargs): + def _register(method: Literal["bulk", "upsert", "delete", "edit"], *args, **kwargs): return registration_methods[method](self._bot.user.id, guild_id, *args, **kwargs) + def register(method: Literal["bulk", "upsert", "delete", "edit"], *args, **kwargs): + if kwargs.pop("_log", True): + if method == "bulk": + _log.debug(f"Bulk updating commands {[c['name'] for c in args[0]]} for guild {guild_id}") + elif method == "upsert": + _log.debug(f"Creating command {cmd['name']} for guild {guild_id}") + elif method == "edit": + _log.debug(f"Editing command {cmd['name']} for guild {guild_id}") + elif method == "delete": + _log.debug(f"Deleting command {cmd['name']} for guild {guild_id}") + return _register(method, *args, **kwargs) + pending_actions = [] if not force: - desynced = await self.get_desynced_commands(guild_id=guild_id) + if guild_id is None: + prefetched_commands = await self.http.get_global_commands(self.user.id) + else: + prefetched_commands = await self.http.get_guild_commands(self.user.id, guild_id) + desynced = await self.get_desynced_commands(guild_id=guild_id, prefetched=prefetched_commands) for cmd in desynced: if cmd["action"] == "delete": pending_actions.append( { - "action": "delete", - "command": cmd["id"], - "name": cmd["command"], + "action": "delete" if delete_existing else None, + "command": collections.namedtuple("Command", ["name"])(name=cmd["command"]), + "id": cmd["id"], } ) continue # We can assume the command item is a command, since it's only a string if action is delete - match = get(pending, name=cmd["command"].name) + match = get(pending, name=cmd["command"].name, type=cmd["command"].type) if match is None: continue if cmd["action"] == "edit": @@ -445,18 +502,35 @@ def register(method: str, *args, **kwargs): "command": match, } ) + elif cmd["action"] is None: + pending_actions.append( + { + "action": None, + "command": match, + } + ) else: raise ValueError(f"Unknown action: {cmd['action']}") - + filtered_no_action = list(filter(lambda c: c["action"] is not None, pending_actions)) filtered_deleted = list(filter(lambda a: a["action"] != "delete", pending_actions)) - if len(filtered_deleted) == len(pending): - # It appears that all the commands need to be modified, so we can just do a bulk upsert + if method == "bulk" or (method == "auto" and len(filtered_deleted) == len(pending)): + # Either the method is bulk or all the commands need to be modified, so we can just do a bulk upsert data = [cmd["command"].to_dict() for cmd in filtered_deleted] - registered = await register("bulk", data) + # If there's nothing to update, don't bother + if len(filtered_no_action) == 0: + _log.debug("Skipping bulk command update: Commands are up to date") + registered = prefetched_commands + else: + _log.debug( + f"Bulk updating commands %s for guild %s", + {c['command'].name: c['action'] for c in pending_actions}, + guild_id + ) + registered = await register("bulk", data, _log=False) else: - if len(pending_actions) == 0: + if not filtered_no_action: registered = [] - for cmd in pending_actions: + for cmd in filtered_no_action: if cmd["action"] == "delete": await register("delete", cmd["command"]) continue @@ -466,16 +540,17 @@ def register(method: str, *args, **kwargs): registered.append(await register("upsert", cmd["command"].to_dict())) else: raise ValueError(f"Unknown action: {cmd['action']}") + + # TODO: Our lists dont work sometimes, see if that can be fixed so we can avoid this second API call + if method != "bulk": + if guild_id is None: + registered = await self._bot.http.get_global_commands(self._bot.user.id) + else: + registered = await self._bot.http.get_guild_commands(self._bot.user.id, guild_id) else: data = [cmd.to_dict() for cmd in pending] registered = await register("bulk", data) - # TODO: Our lists dont work sometimes, see if that can be fixed so we can avoid this second API call - if guild_id is None: - registered = await self._bot.http.get_global_commands(self._bot.user.id) - else: - registered = await self._bot.http.get_guild_commands(self._bot.user.id, guild_id) - for i in registered: cmd = get( self.pending_application_commands, @@ -490,12 +565,14 @@ def register(method: str, *args, **kwargs): return registered async def sync_commands( - self, - commands: Optional[List[ApplicationCommand]] = None, - force: bool = True, - guild_ids: Optional[List[int]] = None, - register_guild_commands: bool = True, - unregister_guilds: Optional[List[int]] = None, + self, + commands: Optional[List[ApplicationCommand]] = None, + method: Literal["individual", "bulk", "auto"] = "bulk", + force: bool = False, + guild_ids: Optional[List[int]] = None, + register_guild_commands: bool = True, + check_guilds: Optional[List[int]] = [], + delete_exiting: bool = True, ) -> None: """|coro| @@ -518,22 +595,29 @@ async def sync_commands( ---------- commands: Optional[List[:class:`~.ApplicationCommand`]] A list of commands to register. If this is not set (None), then all commands will be registered. + method: Literal['individual', 'bulk', 'auto'] + The method to use when registering the commands. If this is set to "individual", then each command will be + registered individually. If this is set to "bulk", then all commands will be registered in bulk. If this is + set to "auto", then the method will be determined automatically. Defaults to "bulk". force: :class:`bool` - Registers the commands regardless of the state of the command on Discord. This can sometimes cause commands - to be re-registered without changes due to a bug in the API, but is sometimes a more foolproof method of - registering commands. Defaults to True. + Registers the commands regardless of the state of the command on Discord. This uses one less API call, but + can result in hitting rate limits more often. Defaults to False. guild_ids: Optional[List[:class:`int`]] A list of guild ids to register the commands for. If this is not set, the commands' :attr:`~.ApplicationCommand.guild_ids` attribute will be used. register_guild_commands: :class:`bool` Whether to register guild commands. Defaults to True. - unregister_guilds: Optional[List[:class:`int`]] + check_guilds: Optional[List[:class:`int`]] A list of guilds ids to check for commands to unregister, since the bot would otherwise have to check all guilds. Unlike ``guild_ids``, this does not alter the commands' :attr:`~.ApplicationCommand.guild_ids` attribute, instead it adds the guild ids to a list of guilds to sync commands for. If ``register_guild_commands`` is set to False, then this parameter is ignored. + delete_exiting: :class:`bool` + Whether to delete existing commands that are not in the list of commands to register. Defaults to True. """ + check_guilds = list(set(check_guilds + (self.debug_guilds or []))) + if commands is None: commands = self.pending_application_commands @@ -542,7 +626,8 @@ async def sync_commands( cmd.guild_ids = guild_ids global_commands = [cmd for cmd in commands if cmd.guild_ids is None] - registered_commands = await self.register_commands(global_commands, force=force) + registered_commands = await self.register_commands(global_commands, method=method, force=force, + delete_existing=delete_exiting) registered_guild_commands = {} @@ -551,12 +636,12 @@ async def sync_commands( for cmd in commands: if cmd.guild_ids is not None: cmd_guild_ids.extend(cmd.guild_ids) - if unregister_guilds is not None: - cmd_guild_ids.extend(unregister_guilds) + if check_guilds is not None: + cmd_guild_ids.extend(check_guilds) for guild_id in set(cmd_guild_ids): guild_commands = [cmd for cmd in commands if cmd.guild_ids is not None and guild_id in cmd.guild_ids] registered_guild_commands[guild_id] = await self.register_commands( - guild_commands, guild_id=guild_id, force=force + guild_commands, guild_id=guild_id, method=method, force=force, delete_existing=delete_exiting ) # TODO: 2.1: Remove this and favor permissions v2 @@ -739,9 +824,11 @@ async def process_application_commands(self, interaction: Interaction, auto_sync if guild_id is None: await self.sync_commands() else: - await self.sync_commands(unregister_guilds=[guild_id]) + + await self.sync_commands(check_guilds=[guild_id]) return self._bot.dispatch("unknown_application_command", interaction) + if interaction.type is InteractionType.auto_complete: return self.dispatch("application_command_auto_complete", interaction, command) @@ -852,6 +939,7 @@ def create_group( name: str, description: Optional[str] = None, guild_ids: Optional[List[int]] = None, + **kwargs ) -> SlashCommandGroup: """A shortcut method that creates a slash command group with no subcommands and adds it to the internal command list via :meth:`~.ApplicationCommandMixin.add_application_command`. @@ -867,6 +955,8 @@ def create_group( guild_ids: Optional[List[:class:`int`]] A list of the IDs of each guild this group should be added to, making it a guild command. This will be a global command if ``None`` is passed. + kwargs: + Any additional keyword arguments to pass to :class:`.SlashCommandGroup`. Returns -------- @@ -874,7 +964,7 @@ def create_group( The slash command group that was created. """ description = description or "No description provided." - group = SlashCommandGroup(name, description, guild_ids) + group = SlashCommandGroup(name, description, guild_ids, **kwargs) self.add_application_command(group) return group @@ -1181,7 +1271,7 @@ def whitelist(ctx): async def can_run(self, ctx: ApplicationContext, *, call_once: bool = False) -> bool: data = self._check_once if call_once else self._checks - if len(data) == 0: + if not data: return True # type-checker doesn't distinguish between functions and methods diff --git a/discord/commands/core.py b/discord/commands/core.py index 8f3677aba9..33b25b86f0 100644 --- a/discord/commands/core.py +++ b/discord/commands/core.py @@ -742,12 +742,14 @@ def is_subcommand(self) -> bool: def to_dict(self) -> Dict: as_dict = { "name": self.name, - "name_localizations": self.name_localizations, "description": self.description, - "description_localizations": self.description_localizations, "options": [o.to_dict() for o in self.options], "default_permission": self.default_permission, } + if self.name_localizations is not None: + as_dict["name_localizations"] = self.name_localizations + if self.description_localizations is not None: + as_dict["description_localizations"] = self.description_localizations if self.is_subcommand: as_dict["type"] = SlashCommandOptionType.sub_command.value @@ -943,7 +945,7 @@ def __init__( ) -> None: validate_chat_input_name(name) validate_chat_input_description(description) - self.name = name + self.name = str(name) self.description = description self.input_type = SlashCommandOptionType.sub_command_group self.subcommands: List[Union[SlashCommand, SlashCommandGroup]] = self.__initial_commands__ @@ -961,6 +963,8 @@ def __init__( self.permissions: List[CommandPermission] = kwargs.get("permissions", []) if self.permissions and self.default_permission: self.default_permission = False + self.name_localizations: Optional[Dict[str, str]] = kwargs.get("name_localizations", None) + self.description_localizations: Optional[Dict[str, str]] = kwargs.get("description_localizations", None) @property def module(self) -> Optional[str]: @@ -973,6 +977,10 @@ def to_dict(self) -> Dict: "options": [c.to_dict() for c in self.subcommands], "default_permission": self.default_permission, } + if self.name_localizations is not None: + as_dict["name_localizations"] = self.name_localizations + if self.description_localizations is not None: + as_dict["description_localizations"] = self.description_localizations if self.parent is not None: as_dict["type"] = self.input_type.value diff --git a/discord/commands/options.py b/discord/commands/options.py index de5c314b3b..d1c7262edf 100644 --- a/discord/commands/options.py +++ b/discord/commands/options.py @@ -117,6 +117,8 @@ async def hello( def __init__(self, input_type: Any, /, description: str = None, **kwargs) -> None: self.name: Optional[str] = kwargs.pop("name", None) + if self.name is not None: + self.name = str(self.name) self.description = description or "No description provided" self.converter = None self._raw_type = input_type @@ -185,9 +187,11 @@ def to_dict(self) -> Dict: "required": self.required, "choices": [c.to_dict() for c in self.choices], "autocomplete": bool(self.autocomplete), - "name_localizations": self.name_localizations, - "description_localizations": self.description_localizations, } + if self.name_localizations is not None: + as_dict["name_localizations"] = self.name_localizations + if self.description_localizations is not None: + as_dict["description_localizations"] = self.description_localizations if self.channel_types: as_dict["channel_types"] = [t.value for t in self.channel_types] if self.min_value is not None: @@ -220,12 +224,19 @@ class OptionChoice: def __init__(self, name: str, value: Optional[Union[str, int, float]] = None, name_localizations: Optional[Dict[str, str]] = None): - self.name = name + self.name = str(name) self.value = value if value is not None else name self.name_localizations = name_localizations def to_dict(self) -> Dict[str, Union[str, int, float]]: - return {"name": self.name, "value": self.value, "name_localizations": self.name_localizations} + as_dict = { + "name": self.name, + "value": self.value + } + if self.name_localizations is not None: + as_dict["name_localizations"] = self.name_localizations + + return as_dict def option(name, type=None, **kwargs):