diff --git a/mojang/api/__init__.py b/mojang/api/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/mojang/api/auth/__init__.py b/mojang/api/auth/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/mojang/api/auth/security.py b/mojang/api/auth/security.py deleted file mode 100644 index 09c00c7e..00000000 --- a/mojang/api/auth/security.py +++ /dev/null @@ -1,96 +0,0 @@ -from ...error.exceptions import * -from ...utils import web -from ..urls import SECURITY_CHALLENGES, SECURITY_CHECK - - -def is_user_ip_secure(access_token: str) -> bool: - """Check if authenticated user IP is secure - - Args: - access_token (str): The session's access token - - Returns: - True if IP is secure else False - - Raises: - Unauthorized: If access token is invalid - PayloadError: If access token is not formated correctly - """ - try: - web.auth_request('get', SECURITY_CHECK, access_token, exceptions=(PayloadError, Unauthorized, IPNotSecured)) - except IPNotSecured: - return False - else: - return True - -def get_user_challenges(access_token: str) -> list: - """Return a list of challenges to verify IP - - Args: - access_token (str): The session's access token - - Returns: - A list of tuples, each one contains the answer's id and the - question - - Example: - - Example of challenges - ```python - [ - (123, "What is your favorite pet's name?"), - (456, "What is your favorite movie?"), - (789, "What is your favorite author's last name?") - ] - ``` - - Raises: - Unauthorized: If access token is invalid - PayloadError: If access token is not formated correctly - """ - data = web.auth_request('get', SECURITY_CHALLENGES, access_token, exceptions=(PayloadError, Unauthorized)) - challenges = [] - if data: - for item in data: - answer_id = item['answer']['id'] - question = item['question']['question'] - challenges.append((answer_id, question)) - return challenges - -def verify_user_ip(access_token: str, answers: list) -> bool: - """Verify IP with the given answers - - Args: - access_token (str): The session's access token - answers (list): The answers to the question - - Example: - - ```python - answers = [ - (123, "foo"), - (456, "bar"), - (789, "baz") - ] - security.verify_user_ip(ACCESS_TOKEN, answers) - ``` - - Returns: - True if IP is secure else False - - Raises: - Unauthorized: If access token is invalid - PayloadError: If access token is not formated correctly - """ - formatted_answers = [] - for answer in answers: - formatted_answers.append({ - 'id': answer[0], - 'answer': answer[1] - }) - try: - web.auth_request('post', SECURITY_CHECK, access_token, exceptions=(PayloadError, Unauthorized, IPVerificationError), json=formatted_answers) - except IPVerificationError: - return False - else: - return True diff --git a/mojang/api/auth/yggdrasil.py b/mojang/api/auth/yggdrasil.py deleted file mode 100644 index 23edb018..00000000 --- a/mojang/api/auth/yggdrasil.py +++ /dev/null @@ -1,130 +0,0 @@ -""" -Mojang Yggdrasil authentication system api -""" - -from ...error.exceptions import * -from ...utils import web -from ..urls import AUTHENTICATE, INVALIDATE, REFRESH, SIGNOUT, VALIDATE - - -def authenticate_user(username: str, password: str, client_token: str = None) -> dict: - """Authenticate user with name and password - - Args: - username (str): The username or email if account is not legacy - password (str): The user password - client_token (str, optional): The client token to use in the authentication (default to None) - - Returns: - A dict with thet following keys : `access_token`, `client_token`, `uuid`, `name`, - `legacy` and `demo` - - Raises: - CredentialsError: If username and password are invalid - PayloadError: If credentials are not formated correctly - """ - payload = { - 'username': username, - 'password': password, - 'clientToken': client_token, - 'agent': { - 'name': 'Minecraft', - 'version': 1 - } - } - - data = web.request('post', AUTHENTICATE, exceptions=(PayloadError, CredentialsError), json=payload) - - return { - 'access_token': data['accessToken'], - 'client_token': data['clientToken'], - 'uuid': data['selectedProfile']['id'], - 'name': data['selectedProfile']['name'], - 'legacy': data['selectedProfile'].get('legacy', False), - 'demo': not data['selectedProfile'].get('paid', True) - } - -def refresh_token(access_token: str, client_token: str) -> dict: - """Refresh an invalid access token - - Args: - access_token (str): The access token to refresh - client_token (str): The client token used to generate the access token - - Returns: - A dict with the following keys: `access_token`, `client_token`, `uuid`, `name`, - `legacy` and `demo` - - Raises: - TokenError: If client token is not the one used to generate the access token - PayloadError: If the tokens are not formated correctly - """ - payload = { - 'accessToken': access_token, - 'clientToken': client_token - } - - data = web.request('post', REFRESH, exceptions=(PayloadError, TokenError), json=payload) - - return { - 'access_token': data['accessToken'], - 'client_token': data['clientToken'], - 'uuid': data['selectedProfile']['id'], - 'name': data['selectedProfile']['name'], - 'legacy': data['selectedProfile'].get('legacy', False), - 'demo': not data['selectedProfile'].get('paid', True) - } - -def validate_token(access_token: str, client_token: str): - """Validate an access token - - Args: - access_token (str): The access token to validate - client_token (str): The client token used to generate the access token - - Raises: - TokenError: If client token is not the one used to generate the access token - PayloadError: If the tokens are not formated correctly - """ - payload = { - 'accessToken': access_token, - 'clientToken': client_token - } - - web.request('post', VALIDATE, exceptions=(PayloadError, TokenError), json=payload) - -def signout_user(username: str, password: str): - """Signout user with name and password - - Args: - username (str): The username or email if account is not legacy - password (str): The user password - - Raises: - CredentialsError: If username and password are invalid - PayloadError: If credentials are not formated correctly - """ - payload = { - 'username': ctx.username, - 'password': ctx.password - } - - web.request('post', SIGNOUT, exceptions=(PayloadError, CredentialsError), json=payload) - -def invalidate_token(access_token: str, client_token: str): - """Invalidate an access token - - Args: - access_token (str): The access token to invalidate - client_token (str): The client token used to generate the access token - - Raises: - TokenError: If client token is not the one used to generate the access token - PayloadError: If the tokens are not formated correctly - """ - payload = { - 'accessToken': access_token, - 'clientToken': client_token - } - - web.request('post', INVALIDATE, exceptions=(PayloadError, TokenError), json=payload) diff --git a/mojang/api/base.py b/mojang/api/base.py deleted file mode 100644 index 94961417..00000000 --- a/mojang/api/base.py +++ /dev/null @@ -1,138 +0,0 @@ -""" -Functions for the basic MojangAPI -""" -import datetime as dt -import json -from base64 import urlsafe_b64decode - -from ..error.exceptions import PayloadError -from ..utils import web -from ..utils.cape import Cape -from ..utils.skin import Skin -from .urls import GET_PROFILE, GET_UUID, GET_UUIDS, NAME_HISTORY, STATUS_CHECK - - -def api_status(service: str = None) -> dict: - """Get the status of Mojang's services - - Args: - service (str, optional): The name of the service which you want to know the status - - Returns: - If service is given, return only the status for this service else return a dict - with all the status for each services - """ - res = {} - - data = web.request('get', STATUS_CHECK) - for s in data: - res.update(s) - - if service: - return res[service] - - return res - -def name_history(uuid: str) -> list: - """Get the user's name history - - Args: - uuid (str): The user's uuid - - Returns: - A list of tuples, each containing the name and the datetime at which it was - changed to - """ - names = [] - - data = web.request('get', NAME_HISTORY.format(uuid=uuid)) - for item in data: - changed_at = None - if 'changedToAt' in item.keys(): - changed_at = dt.datetime.fromtimestamp(item['changedToAt'] / 1000) - names.append((item['name'], changed_at)) - - return names - -def get_uuid(username: str, only_uuid: bool = True) -> dict: - """Get uuid of username - - Args: - username (str): The username which you want the uuid of - only_uuid (bool): If True only the uuid is returned (default True) - - Returns: - If only_uuid is True, then only the uuid of the username is returned. - If not, a dict is returned with the following keys: `name`, `uuid`, - `legacy` and `demo`. - """ - data = web.request('get', GET_UUID.format(name=username)) - - if data: - data['uuid'] = data.pop('id') - data['legacy'] = data.get('legacy', False) - data['demo'] = data.get('demo', False) - - if only_uuid: - return data['uuid'] - else: - data = None - - return data - -def get_uuids(usernames: list, only_uuid: bool = True) -> list: - """Get uuid of multiple username - - Note: Limited Endpoint - The Mojang API only allow 10 usernames maximum, if more than 10 usernames are - given to the function, multiple request will be made. - - Args: - usernames (list): The list of username which you want the uuid of - only_uuid (bool): If True only the uuid is returned for each username (default True) - - Returns: - If only_uuid is True, then a list of uuid is returned. If not, a list of dict is returned. - Each dict contains the following keys: `name`, `uuid`, `legacy` and `demo`. - """ - res = [None]*len(usernames) - - for i in range(0, len(usernames), 10): - data = web.request('post', GET_UUIDS, exceptions=(PayloadError,),json=usernames[i:i+10]) - for item in data: - index = usernames.index(item['name']) - if not only_uuid: - res[index] = { - 'uuid': item['id'], - 'name': item['name'], - 'legacy': item.get('legacy',False), - 'demo': item.get('demo', False) - } - else: - res[index] = item['id'] - - return res - -def get_profile(uuid: str) -> dict: - """Get profile information by uuid - - Args: - uuid (str): The uuid of the profile - - Returns: - A dict with the following keys: `uuid`, `name`, `skins` and `capes` - """ - data = web.request('get', GET_PROFILE.format(uuid=uuid), exceptions=(PayloadError,)) - if data: - res = {'name': None, 'uuid': None,'skins': [], 'capes': []} - res['uuid'] = data['id'] - res['name'] = data['name'] - - for d in data['properties']: - textures = json.loads(urlsafe_b64decode(d['value']))['textures'] - if 'SKIN' in textures.keys(): - res['skins'].append(Skin(textures['SKIN']['url'], textures['SKIN'].get('metadata',{}).get('model','classic'))) - if 'CAPE' in textures.keys(): - res['skins'].append(Cape(textures['CAPE']['url'])) - - return res diff --git a/mojang/api/files/arguments.py b/mojang/api/files/arguments.py deleted file mode 100644 index b6a259ca..00000000 --- a/mojang/api/files/arguments.py +++ /dev/null @@ -1,113 +0,0 @@ -import platform -import re - -class Arguments: - - def __init__(self, game_args: list, jvm_args: list): - self.game_args = self._parse_args(game_args, GameArgument) - self.jvm_args = self._parse_args(jvm_args, JvmArgument) - self.all = self.game_args + self.jvm_args - - def _parse_args(self, args: list, cls): - _args = [] - - if isinstance(args, list): - for arg in args: - if isinstance(arg, dict): - rules = arg.get('rules', None) - - if isinstance(arg['value'], str): - values = [arg['value']] - else: - values = arg['value'] - - for value in values: - _args.append(cls(value, rules)) - else: - _args.append(cls(arg, None)) - - return tuple(_args) - -class GameArgument: - - def __init__(self, argument: str, rules=None): - self.argument = argument - self._parse_rules(rules) - - def _parse_rules(self, rules: list): - _allowed_features = {} - _disallowed_features = {} - if rules: - - rules.sort(key=lambda item: item['action']) - - for rule in rules: - features = rule.get('features', {}) - if rule['action'] == 'allow': - _allowed_features.update(features) - else: - _disallowed_features.update(features) - - self.allowed_features = tuple(_allowed_features.items()) - self.disallowed_features = tuple(_disallowed_features.items()) - - -class JvmArgument: - - def __init__(self, argument: str, rules=None): - self.argument = argument - self._parse_rules(rules) - - def _parse_rules(self, rules: list): - _os = {'windows', 'linux', 'darwin'} - _version = dict.fromkeys(_os) - _arch = {'x86','x64'} - - if rules: - - rules.sort(key=lambda item: item['action']) - - for rule in rules: - os = rule.get('os', {}) - os_name = os.get('name', None) - os_name = 'darwin' if os_name == 'osx' else os_name - os_version = os.get('version', None) - os_arch = os.get('arch', None) - - if rule['action'] == 'allow': - if os_name: - _os = {os_name} - _version[os_name] = os_version - - if os_arch: - _arch = {os_arch} - else: - if os_name: - _os.remove(os_name) - _version.pop(os_name) - - if os_arch: - _arch.remove(os_arch) - - if not os: - _os = set() - _arch = set() - - - self.allowed_os = tuple(_os) - self.allowed_versions = tuple(_version.items()) - self.allowed_arch = tuple(_arch) - - def required(self, system=None, version=None, architecture=None): - if not system: - system = platform.system().lower() - if not version: - version = platform.version() - if not architecture: - architecture = 'x86' if platform.architecture()[0] == 32 else 'x64' - - is_system_ok = system in self.allowed_os - is_version_ok = [(item[1] is None or re.match(item[1], version)) for item in self.allowed_versions if item[0] == system] - is_arch_ok = architecture in self.allowed_arch - - return is_system_ok and is_version_ok and is_arch_ok diff --git a/mojang/api/files/assets.py b/mojang/api/files/assets.py deleted file mode 100644 index f0ce4f3b..00000000 --- a/mojang/api/files/assets.py +++ /dev/null @@ -1,51 +0,0 @@ -import json -import os -from os import path - -import requests - -from ...utils import web -from ..urls import MC_VERSIONS, RESOURCES_DOWNLOAD - - -class AssetIndex: - - def __init__(self, id: str, size: int, total_size: int, url: str): - self.id = id - self.size = size - self.total_size = total_size - self.url = url - - @property - def _data(self): - return web.request('get', self.url) - - def download(self, directory: str): - # Retrieve asset index - data = self._data - - # Create index.json in /indexes - indexes_dir = path.join(directory, 'indexes') - if not path.exists(indexes_dir): - os.mkdir(indexes_dir) - - index_file = path.join(indexes_dir, f'{self.id}.json') - with open(index_file, 'w') as fp: - json.dump(data, fp) - - # Create hash files in /objects - objects_dir = path.join(directory, 'objects') - if not path.exists(objects_dir): - os.mkdir(objects_dir) - - for _, value in data['objects'].items(): - resources_hash = value['hash'] - response = requests.get(RESOURCES_DOWNLOAD.format(resources_hash[:2], resources_hash)) - - resource_dir = path.join(objects_dir, resources_hash[:2]) - if not path.exists(resource_dir): - os.mkdir(resource_dir) - - resource_file = path.join(resource_dir, resources_hash) - with open(resource_file, 'wb') as fp: - fp.write(response.content) diff --git a/mojang/api/files/libraries.py b/mojang/api/files/libraries.py deleted file mode 100644 index f76eb4df..00000000 --- a/mojang/api/files/libraries.py +++ /dev/null @@ -1,97 +0,0 @@ -import os -import platform -from os import path - -import requests - - -class Librairies: - - def __init__(self, data: dict): - librairies = {} - natives = {} - - for item in data: - name = item['name'] - rules = item.get('rules', None) - artifact = item['downloads'].get('artifact', None) - if artifact: - librairies[artifact['sha1']] = Library(name, artifact['path'], artifact['sha1'], artifact['size'], artifact['url'], False, None, rules) - - natives_artifact = item['downloads'].get('classifiers', {}) - for native_name, native in natives_artifact.items(): - natives[native['sha1']] = Library(name, native['path'], native['sha1'], native['size'], native['url'], True, native_name, rules) - - self.librairies = tuple(librairies.values()) - self.natives = tuple(natives.values()) - self.all = self.librairies + self.natives - - def download_all(self, directory: str, system=None): - for lib in self.all: - lib.download(directory, system) - - def paths(self, directory: str, system=None): - return [path.join(directory,lib.path,lib.filename) for lib in self.all if lib.required(system)] - -class Library: - - def __init__(self, name: str, _path: str, _hash: str, size: int, url: str, is_native=False, native_name=None, rules=None): - self.name = name - self.path, self.filename = path.split(_path) - self.hash = _hash - self.size = size - self.url = url - self.native_name = native_name - self.is_native = is_native - - self._parse_rules(rules) - - def _parse_rules(self, rules: list): - _os = {'windows','linux','darwin'} - - if rules: - # allowed will be processed before disallowed - rules.sort(key=lambda item: item['action']) - - for rule in rules: - os_name = rule.get('os', {}).get('name', None) - os_name = 'darwin' if os_name == 'osx' else os_name - - if rule['action'] == 'allow': - if os_name: - _os = {os_name} - else: - if os_name: - _os.remove(os_name) - else: - _os = set() - - if self.is_native: - if 'windows' in self.native_name and 'windows' in _os: - _os = {'windows'} - elif 'linux' in self.native_name and 'linux' in _os: - _os = {'linux'} - elif ('macos' in self.native_name or 'osx' in self.native_name) and 'darwin' in _os: - _os = {'darwin'} - else: - _os = set() - - self.allowed_os = _os - - def required(self, system=None): - if not system: - system = platform.system().lower() - - return system in self.allowed_os - - def download(self, directory: str, system=None): - if self.required(system) and path.exists(directory): - _path = path.join(directory, self.path) - if not path.exists(_path): - os.makedirs(_path) - - file_path = path.join(_path, self.filename) - - response = requests.get(self.url) - with open(file_path, 'wb') as fp: - fp.write(response.content) diff --git a/mojang/api/files/versions.py b/mojang/api/files/versions.py deleted file mode 100644 index f8db6fb9..00000000 --- a/mojang/api/files/versions.py +++ /dev/null @@ -1,342 +0,0 @@ -import datetime as dt -import json -import os -from os import path - -import requests - -from ...utils import web -from ..urls import MC_VERSIONS -from .arguments import Arguments -from .assets import AssetIndex -from .libraries import Librairies - - -class ServerInstallation: - - def __init__(self, version: 'MinecraftVersion', **kwargs): - self.__version = version - self.__system = kwargs.get('system') - - def __enter__(self): - return self - - def __exit__(self, *args): - pass - - def install(self, directory: str, filename=None): - if not path.exists(directory): - return - - if not filename: - filename = f'server-{self.__version.id}.jar' - - filepath = path.join(directory, filename) - - server = self.__version.server[0] - if self.__system == 'windows' and self.__version.server[1]: - server = self.__version.server[1] - - response = requests.get(server[0]) - with open(filepath, 'wb') as fp: - fp.write(response.content) - - def create_config(self, config_path: str): - pass - - -class ClientInstallation: - - def __init__(self, version: 'MinecraftVersion', **kwargs): - self.__version = version - self.__system = kwargs.get('system', None) - self.__sys_version = kwargs.get('version', None) - self.__arch = kwargs.get('arch', None) - - def __enter__(self): - self.__asset_dir = None - self.__libs_dir = None - self.__client_path = None - return self - - def __exit__(self, *args): - self.__asset_dir = None - self.__libs_dir = None - self.__client_path = None - - def _ensure_dir(self, directory: str): - if not path.exists(directory): - os.makedirs(directory, exist_ok=True) - - return directory - - def install(self, directory: str): - if not path.exists(directory): - return - - assets_dir = self._ensure_dir(path.join(directory, 'assets')) - libs_dir = self._ensure_dir(path.join(directory, 'libraries')) - client_dir = self._ensure_dir(path.join(directory, 'versions', self.__version.id)) - client_path = path.join(client_dir, f'client-{self.__version.id}.jar') - - print('Downloading assets...') - self.download_assets(assets_dir) - print('Downloading libraries...') - self.download_libs(libs_dir) - print('Downloading client...') - self.download_client(client_path) - - def download_assets(self, directory: str): - self.__version.asset_index.download(directory) - self.__asset_dir = directory - - def download_libs(self, directory: str): - self.__version.libraries.download_all(directory, self.__system) - self.__libs_dir = directory - - def download_client(self, filepath: str): - response = requests.get(self.__version.client[0]) - with open(filepath, 'wb') as fp: - fp.write(response.content) - - self.__client_path = filepath - - def create_config(self, config_path: str, game_directory: str): - # Assets - index_file = path.join(self.__asset_dir, 'indexes', f'{self.__version.asset_index.id}.json') - - # Libraries - installed_libs = [] - installed_natives = [] - - filtered = filter(lambda lib: lib.required(self.__system), self.__version.libraries.all) - for lib in filtered: - if not lib.is_native: - installed_libs.append(path.join(self.__libs_dir, lib.path, lib.filename)) - else: - installed_natives.append(path.join(self.__libs_dir, lib.path, lib.filename)) - - # Game Argument - game_args = {} - for garg in self.__version.arguments.game_args: - game_args[garg.argument] = { - 'allowed': dict(garg.allowed_features), - 'disallowed': dict(garg.disallowed_features) - } - - jvm_args = [] - for jarg in self.__version.arguments.jvm_args: - if jarg.required(self.__system, self.__sys_version, self.__arch): - jvm_args.append(jarg.argument) - - # Create config - config = { - 'assets': { - 'id': self.__version.asset_index.id, - 'directory': self.__asset_dir, - 'index_file': index_file - }, - 'libraries': installed_libs, - 'natives': installed_natives, - 'client': { - 'id': self.__version.id, - 'type': self.__version.type, - 'main_class': self.__version.main_class - }, - 'game_directory': game_directory, - 'arguments': { - 'game': game_args, - 'jvm': jvm_args - } - } - with open(config_path, 'w') as fp: - json.dump(config, fp) - - -class MinecraftVersions: - __instance = None - - def __new__(cls): - if not cls.__instance: - cls.__instance = object.__new__(cls) - - return cls.__instance - - def __init__(self): - self.__versions = [] - self.__last_release = None - self.__last_snapshot = None - - def refresh(self): - data = web.request('get', MC_VERSIONS) - - versions = [] - for version in data['versions']: - obj = MinecraftVersion(version['id'], version['url'], version['type']) - - if obj.is_snapshot and obj.id == data['latest']['snapshot']: - self.__last_snapshot = obj - elif obj.is_release and obj.id == data['latest']['release']: - self.__last_release = obj - - versions.append(obj) - - self.__versions = versions - - @property - def last_release(self) -> 'MinecraftVersion': - return self.__last_release - - @property - def last_snapshot(self) -> 'MinecraftVersion': - return self.__last_snapshot - - @property - def all(self) -> list: - return self.__versions - - def get(self, _id: str) -> 'MinecraftVersion': - version = [v for v in self.__versions if v.id == _id] - if len(version) > 0: - version[0]._load() - return version[0] - - def __getitem__(self, key: str): - return self.get(key) - - -class MinecraftVersion: - - def __init__(self, _id: str, url: str, _type: str): - self.__id = _id - self.__url = url - self.__type = _type - self.__loaded = False - - self.__time = None - self.__release_time = None - - self.__main_class = None - self.__minimum_launcher_version = None - - self.__client = None - self.__server = None - self.__win_server = None - - self.__java_version = None - - self.__asset_index = None - self.__libraries = None - - self.__arguments = None - - def _load(self): - if not self.__loaded: - data = web.request('get', self.__url) - - # Load release time - self.__time = dt.datetime.strptime(data['time'], '%Y-%m-%dT%H:%M:%S%z') - self.__release_time = dt.datetime.strptime(data['releaseTime'], '%Y-%m-%dT%H:%M:%S%z') - - # Load basic info - self.__main_class = data['mainClass'] - self.__minimum_launcher_version = data['minimumLauncherVersion'] - - # Load Java version - _java_version = data.get('javaVersion', None) - if _java_version: - self.__java_version = (_java_version['component'], _java_version['majorVersion']) - - # Load Downloads - _downloads = data.get('downloads', None) - if _downloads: - client = _downloads['client'] - client_mappings = _downloads.get('client_mappings', dict.fromkeys(['url', 'size'], None)) - - self.__client = (client['url'], client['size'], client_mappings['url'], client_mappings['size']) - - server = _downloads.get('server', None) - if server: - server_mappings = _downloads.get('server_mappings', dict.fromkeys(['url', 'size'], None)) - - self.__server = (server['url'], server['size'], server_mappings['url'], server_mappings['size']) - - win_server = _downloads.get('windows_server', None) - if win_server: - self.__win_server = (win_server['url'], win_server['size'], None, None) - - # Load asset index - asset_index = data['assetIndex'] - self.__asset_index = AssetIndex(asset_index['id'], asset_index['size'], asset_index['totalSize'], asset_index['url']) - - # Load Libraries - self.__libraries = Librairies(data['libraries']) - - # Load Arguments - if 'arguments' in data: - self.__arguments = Arguments(data['arguments']['game'], data['arguments']['jvm']) - else: - self.__arguments = Arguments(data['minecraftArguments'].split(' '), None) - - # Prevent from loading again - self.__loaded = True - - # Properties - @property - def url(self): - return self.__url - - @property - def id(self): - return self.__id - - @property - def type(self): - return self.__type - - @property - def is_release(self): - return self.__type == 'release' - - @property - def is_snapshot(self): - return self.__type == 'snapshot' - - @property - def is_alpha(self): - return self.__type == 'old_alpha' - - @property - def is_beta(self): - return self.__type == 'old_beta' - - @property - def main_class(self): - return self.__main_class - - @property - def asset_index(self): - return self.__asset_index - - @property - def libraries(self): - return self.__libraries - - @property - def arguments(self): - return self.__arguments - - @property - def client(self): - return self.__client - - @property - def server(self): - return self.__server, self.__win_server - - # Context manager - def create_client(self, **kwargs) -> ClientInstallation: - return ClientInstallation(self, **kwargs) - - def create_server(self, **kwargs) -> ServerInstallation: - return ServerInstallation(self, **kwargs) diff --git a/mojang/api/net/__init__.py b/mojang/api/net/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/mojang/api/net/common/packet.py b/mojang/api/net/common/packet.py deleted file mode 100644 index a014c57c..00000000 --- a/mojang/api/net/common/packet.py +++ /dev/null @@ -1,43 +0,0 @@ -import io -from typing import IO - - -class BasePacket: - - def __new__(cls, **kwargs): - obj = object.__new__(cls) - - for key, value in obj._types().items(): - setattr(obj, key, kwargs.get(key, value.default)) - - return obj - - @classmethod - def from_bytes(cls, data: bytes): - with io.BytesIO(data) as buffer: - packet = cls.read(buffer) - - return packet - - @classmethod - def _types(cls): - return dict([(key, getattr(cls, key)) for key, value in getattr(cls, '__annotations__', {}).items()]) - - @classmethod - def read(cls, buffer: IO): - _dict = {} - for key, value in cls._types().items(): - _dict[key] = value.read(buffer) - return cls(**_dict) - - def write(self, buffer: IO): - for key, value in self._types().items(): - value.write(getattr(self, key), buffer) - - @property - def data(self): - with io.BytesIO() as buffer: - self.write(buffer) - data = buffer.getvalue() - - return data diff --git a/mojang/api/net/common/types.py b/mojang/api/net/common/types.py deleted file mode 100644 index c07139ee..00000000 --- a/mojang/api/net/common/types.py +++ /dev/null @@ -1,272 +0,0 @@ -import struct -import io -from typing import IO - -## Base Type ## -class Type: - _python_type = None - - def __new__(cls, default=None, **kwargs): - obj = object.__new__(cls) - - obj.__default = default - obj.__options = kwargs - - return obj - - @property - def default(self): - return self.__default - - def check_value(self, value): - pass - - def read(self, buffer: IO): - raise NotImplementedError() - - def write(self, value, buffer: IO): - raise NotImplementedError() - - -## Numeric Types ## -class NumericType(Type): - _python_type = int - - def __new__(cls, default=0, signed=True, **kwargs): - obj = super().__new__(cls, default, **kwargs) - obj.__signed = signed - obj.__format = '{}{}'.format(cls.__order, cls.__ctype[signed]) - return obj - - def __init_subclass__(cls, nbytes: int, order='big'): - cls.__nbytes = nbytes - - if nbytes == 1: - cls.__ctype = ['B','b'] - elif nbytes == 2: - cls.__ctype = ['H','h'] - elif nbytes == 4: - cls.__ctype = ['I', 'i'] - elif nbytes == 8: - cls.__ctype = ['Q', 'q'] - else: - raise ValueError('Invalid number of bytes. Valid values are `1`, `2`, `4` or `8`') - - if order == 'big': - cls.__order = '>' - elif order == 'little': - cls.__order = '<' - else: - cls.__order = '=' - - def check_value(self, value: int): - if not isinstance(value, int): - raise TypeError('Wrong type, got `{}` expect `int`'.format(type(value).__name__)) - - bits = struct.calcsize(self.__format) * 8 - min_val, max_val = 0, (2**bits) - 1 - if self.__signed: - min_val = -(2**bits) // 2 - max_val = ((2**bits) // 2) - 1 - - if not (min_val <= value <= max_val): - raise ValueError('The value is too big or to small. Value must be between `{}` and `{}`'.format(min_val, max_val)) - - @property - def size(self): - return self.__nbytes - - @property - def signed(self): - return self.__signed - - def read(self, buffer: IO): - return struct.unpack(self.__format, buffer.read(self.__nbytes))[0] - - def write(self, value: int, buffer: IO): - self.check_value(value) - return buffer.write(struct.pack(self.__format, value)) - - -class Byte(NumericType, nbytes=1): - pass - -class BigEndianShort(NumericType, nbytes=2): - pass - -class LittleEndianShort(NumericType, nbytes=2, order='little'): - pass - -class BigEndianInt32(NumericType, nbytes=4): - pass - -class LittleEndianInt32(NumericType, nbytes=4, order='little'): - pass - -class BigEndianInt64(NumericType, nbytes=8): - pass - -class LittleEndianInt64(NumericType, nbytes=8, order='little'): - pass - - -## Floating Types ## -class FloatingType(Type): - _python_type = float - - def __new__(cls, default=0, **kwargs): - obj = super().__new__(cls, default, **kwargs) - obj.__format = '{}{}'.format(cls.__order, cls.__ctype) - return obj - - def __init_subclass__(cls, nbytes: int, order='big'): - cls.__nbytes = nbytes - if nbytes == 2: - cls.__ctype = 'e' - elif nbytes == 4: - cls.__ctype = 'f' - elif nbytes == 8: - cls.__ctype = 'd' - else: - raise ValueError('Invalid size. Valid options are `2`,`4` or `8`') - - if order == 'big': - cls.__order = '>' - elif order == 'little': - cls.__order = '<' - else: - cls.__order = '=' - - def check_value(self, value: float): - if not isinstance(value, float): - raise TypeError('Wrong type, got `{}` expect `float`'.format(type(value).__name__)) - - # Try to pack the value to see if it will fit - struct.pack(self.__format, value) - - @property - def size(self): - return self.__nbytes - - def read(self, buffer: IO): - return struct.unpack(self.__format, buffer.read(self.__nbytes))[0] - - def write(self, value: float, buffer: IO): - self.check_value(value) - return buffer.write(struct.pack(self.__format, value)) - - -class BigEndianFloat16(FloatingType, nbytes=2): - pass - -class LittleEndianFloat16(FloatingType, nbytes=2, order='little'): - pass - -class BigEndianFloat32(FloatingType, nbytes=4): - pass - -class LittleEndianFloat32(FloatingType, nbytes=4, order='little'): - pass - -class BigEndianFloat64(FloatingType, nbytes=8): - pass - -class LittleEndianFloat64(FloatingType, nbytes=8, order='little'): - pass - - -## Other Types ## -class Bool(Type): - _python_type = bool - - def __new__(cls, default=False, **kwargs): - return super().__new__(cls, default, **kwargs) - - def check_value(self, value: bool): - if not isinstance(value, bool): - raise TypeError('Wrong type, got `{}` expect `bool`'.format(type(value).__name__)) - - def read(self, buffer: IO): - return struct.unpack('?', buffer.read(1))[0] - - def write(self, value: bool, buffer: IO): - self.check_value(value) - buffer.write(struct.pack('?', value)) - -class ArrayOf(Type): - - def __new__(cls, _type: Type, size: int, **kwargs): - obj = super().__new__(cls,default=[_type.default for _ in range(size)], **kwargs) - obj.__type = _type - obj.__size = size - return obj - - def check_value(self, value: list): - if not isinstance(value, list): - raise TypeError('Wrong type, got `{}` expect `list`'.format(type(value).__name__)) - - if len(value) != self.__size: - raise ValueError('List is too big, length is `{}` excpected `{}`'.format(len(value), self.__size)) - - def read(self, buffer: IO): - data = [] - for _ in range(self.__size): - data.append(self.__type.read(buffer)) - return data - - def write(self, value: list, buffer: IO): - self.check_value(value) - for item in value: - self.__type.write(item, buffer) - -class NullTerminatedString(Type): - _python_type = str - - def __new__(cls, encoding='ascii', **kwargs): - obj = super().__new__(cls, default='') - obj.__encoding = encoding - return obj - - @property - def encoding(self): - return self.__encoding - - def check_value(self, value: str): - if not isinstance(value, str): - raise TypeError('Wrong type, got `{}` expect `str`'.format(type(value).__name__)) - - def read(self, buffer: IO): - text = b'' - - byte = buffer.read(1) - while byte != b'\0': - text += byte - byte = buffer.read(1) - - return text.decode(self.__encoding) - - def write(self, value: str, buffer: IO): - self.check_value(value) - buffer.write(value.encode(self.__encoding) + b'\0') - -class Bytes(Type): - _python_type = bytes - - def __new__(cls, nbytes: int, default=bytes(), **kwargs): - obj = super().__new__(cls, default=default, **kwargs) - obj.__nbytes = nbytes - return obj - - def check_value(self, value: bytes): - if not isinstance(value, bytes): - raise TypeError('Wrong type, got `{}` expect `bytes`'.format(type(value).__name__)) - - if len(value) > self.__nbytes: - raise ValueError('Value is too big') - - def read(self, buffer: IO): - return buffer.read(self.__nbytes) - - def write(self, value: bytes, buffer: IO): - self.check_value(value) - buffer.write(value) diff --git a/mojang/api/net/query/__init__.py b/mojang/api/net/query/__init__.py deleted file mode 100644 index 905a879f..00000000 --- a/mojang/api/net/query/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .client import QueryClient \ No newline at end of file diff --git a/mojang/api/net/query/client.py b/mojang/api/net/query/client.py deleted file mode 100644 index 9859bd87..00000000 --- a/mojang/api/net/query/client.py +++ /dev/null @@ -1,73 +0,0 @@ -import socket -import time - -from .packets.handshake import HandhakeRequestPacket, HandhakeResponsePacket -from .packets.stats import BasicStatsRequestPacket, BasicStatsResponsePacket, FullStatsRequestPacket, FullStatsResponsePacket - -class QueryClient: - - def __init__(self, host: str, port: int): - self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.__sock.settimeout(2) - self.__host = (host, port) - self.__token = None - self.__session_id = int(time.time()) & 0x0F0F0F0F - - self._connect() - - def _connect(self): - self.__sock.connect(self.__host) - - # Send request - packet = HandhakeRequestPacket(id=self.__session_id) - with self.__sock.makefile('wb') as buffer: - packet.write(buffer) - - # Receive response - with self.__sock.makefile('rb') as buffer: - r_packet = HandhakeResponsePacket.read(buffer) - - # Check type and session id - if r_packet.type != 9 or r_packet.id != packet.id: - raise Exception("Error while getting the handshake") - - # Parse token to bytes - self.__token = int(r_packet.token) - - def _basic_stats(self): - # Send packet - packet = BasicStatsRequestPacket(token=self.__token, id=self.__session_id) - with self.__sock.makefile('wb') as buffer: - packet.write(buffer) - - # Receive packet - with self.__sock.makefile('rb') as buffer: - r_packet = BasicStatsResponsePacket.read(buffer) - - # Check type and session id - if r_packet.type != 0 or r_packet.id != packet.id: - raise Exception("Error while getting basic stats") - - return r_packet - - def _full_stats(self): - # Send packet - packet = FullStatsRequestPacket(token=self.__token, id=self.__session_id) - with self.__sock.makefile('wb') as buffer: - packet.write(buffer) - - # Receive packet - with self.__sock.makefile('rb') as buffer: - r_packet = FullStatsResponsePacket.read(buffer) - - # Check type and session id - if r_packet.type != 0 or r_packet.id != packet.id: - raise Exception("Error while getting full stats") - - return r_packet - - def stats(self, full=False): - if full: - return self._full_stats() - else: - return self._basic_stats() diff --git a/mojang/api/net/query/packets/__init__.py b/mojang/api/net/query/packets/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/mojang/api/net/query/packets/handshake.py b/mojang/api/net/query/packets/handshake.py deleted file mode 100644 index 31be77d7..00000000 --- a/mojang/api/net/query/packets/handshake.py +++ /dev/null @@ -1,12 +0,0 @@ -from ...common.packet import BasePacket -from ...common.types import * - -class HandhakeRequestPacket(BasePacket): - magic: int = BigEndianShort(default=0xFEFD, signed=False) - type: int = Byte(default=9) - id: int = BigEndianInt32() - -class HandhakeResponsePacket(BasePacket): - type: int = Byte() - id: int = BigEndianInt32() - token: str = NullTerminatedString() diff --git a/mojang/api/net/query/packets/stats.py b/mojang/api/net/query/packets/stats.py deleted file mode 100644 index e4e67bc5..00000000 --- a/mojang/api/net/query/packets/stats.py +++ /dev/null @@ -1,61 +0,0 @@ -from ...common.packet import BasePacket -from ...common.types import * -from .types import NullTerminatedStringList - - -class BasicStatsRequestPacket(BasePacket): - magic: int = BigEndianShort(default=0xFEFD, signed=False) - type: int = Byte(default=0) - id: int = BigEndianInt32() - token: int = BigEndianInt32() - -class BasicStatsResponsePacket(BasePacket): - type: int = Byte() - id: int = BigEndianInt32() - motd: str = NullTerminatedString() - game_type: str = NullTerminatedString() - map: str = NullTerminatedString() - num_players: str = NullTerminatedString() - max_players: str = NullTerminatedString() - host_port: int = LittleEndianShort() - host_ip: str = NullTerminatedString() - - -class FullStatsRequestPacket(BasePacket): - magic: int = BigEndianShort(default=0xFEFD, signed=False) - type: int = Byte(default=0) - id: int = BigEndianInt32() - token: int = BigEndianInt32() - padding: int = BigEndianInt32(default=0xFFFFFF01, signed=False) - -class FullStatsResponsePacket(BasePacket): - type: int = Byte() - id: int = BigEndianInt32() - is_last_packet: int = Byte() - # Padding - _pad1: int = ArrayOf(Byte(), 10) - # K.V Section - _motd: str = NullTerminatedString() - motd: str = NullTerminatedString() - _game_type: str = NullTerminatedString() - game_type: str = NullTerminatedString() - _game_id: str = NullTerminatedString() - game_id: str = NullTerminatedString() - _version: str = NullTerminatedString() - version: str = NullTerminatedString() - _plugins: str = NullTerminatedString() - plugins: str = NullTerminatedString() - _map: str = NullTerminatedString() - map: str = NullTerminatedString() - _num_players: str = NullTerminatedString() - num_players: str = NullTerminatedString() - _max_players: str = NullTerminatedString() - max_players: str = NullTerminatedString() - _host_port: str = NullTerminatedString() - host_port: str = NullTerminatedString() - _host_ip: str = NullTerminatedString() - host_ip: str = NullTerminatedString() - # Padding - _pad2: int = ArrayOf(Byte(), 11) - # Players Section - players: list = NullTerminatedStringList() diff --git a/mojang/api/net/query/packets/types.py b/mojang/api/net/query/packets/types.py deleted file mode 100644 index 0f444049..00000000 --- a/mojang/api/net/query/packets/types.py +++ /dev/null @@ -1,29 +0,0 @@ -from typing import IO - -from ...common.types import Type, NullTerminatedString - -class NullTerminatedStringList(Type): - - def __new__(cls, **kwargs): - return super().__new__(cls, default=[], **kwargs) - - def check_value(self, value: str): - if not isinstance(value, list): - raise TypeError('Wrong type, got `{}` expect `list`'.format(type(value).__name__)) - - def read(self, buffer: IO): - values = [] - string = NullTerminatedString() - current = string.read(buffer) - - while len(current) > 0: - values.append(current) - current = string.read(buffer) - - return values - - def write(self, values: list, buffer: IO): - string = NullTerminatedString() - for value in values: - string.write(value, buffer) - buffer.write(b'\0') diff --git a/mojang/api/net/rcon/__init__.py b/mojang/api/net/rcon/__init__.py deleted file mode 100644 index 973ff9b7..00000000 --- a/mojang/api/net/rcon/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .client import RconClient \ No newline at end of file diff --git a/mojang/api/net/rcon/client.py b/mojang/api/net/rcon/client.py deleted file mode 100644 index e346e7e6..00000000 --- a/mojang/api/net/rcon/client.py +++ /dev/null @@ -1,74 +0,0 @@ -import struct -import socket - -from ..common.packet import BasePacket -from .packets.login import LoginRequestPacket, LoginResponsePacket -from .packets.command import CommandRequestPacket, CommandResponsePacket - -class RconClient: - - def __init__(self, host: str, port: int, password: str): - self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.__host = (host, port) - self.__password = password - - self.__id__ = 0 - - @property - def _new_id(self): - self.__id__ += 1 - return self.__id__ - - def _recv(self, cls: BasePacket = None): - with self.__sock.makefile('rb') as stream: - packet_length = struct.unpack('>= 7 - - if value > 0: - byte |= 0x80 - - buffer.write(struct.pack('B', byte)) - - if value == 0: - break - - -class VarInt(VarType, nbytes=5): - pass - -class VarLong(VarType, nbytes=10): - pass - -## Other ## -class String(Type): - _python_type = str - - def __new__(cls, default='', encoding='utf-8', **kwargs): - obj = super().__new__(cls, default=default, **kwargs) - obj.__encoding = encoding - return obj - - @property - def encoding(self): - return self.__encoding - - def check_value(self, value: str): - if not isinstance(value, str): - raise TypeError('Wrong type, got `{}` expect `str`'.format(type(value).__name__)) - - def read(self, buffer: IO): - length = VarInt().read(buffer) - return buffer.read(length).decode(self.__encoding) - - def write(self, value: str, buffer: IO): - self.check_value(value) - VarInt().write(len(value), buffer) - buffer.write(value.encode(self.__encoding)) - -class JSONString(String): - _python_type = dict - - def read(self, buffer: IO): - return json.loads(super().read(buffer)) - - def write(self, value: dict, buffer: IO): - text = json.dumps(value) - super().write(text, buffer) diff --git a/mojang/api/net/slp/slp.py b/mojang/api/net/slp/slp.py deleted file mode 100644 index d5da9c22..00000000 --- a/mojang/api/net/slp/slp.py +++ /dev/null @@ -1,133 +0,0 @@ -import io -import json -import socket -import struct -import time - -from .packets.types import VarInt -from .packets.current import SLPCurrentHandshakePacket, SLPCurrentRequestPacket, SLPCurrentResponsePacket -from .packets.ping import SLPPingPacket, SLPPongPacket - -def _slp_current(sock: socket.socket, hostname='localhost', port=25565): - # Send handshake - packet = SLPCurrentHandshakePacket(proto_version=0, server_address=hostname, server_port=port, next_state=0x01) - with sock.makefile('wb') as buffer: - VarInt().write(len(packet.data), buffer) - packet.write(buffer) - - # Send request - packet = SLPCurrentRequestPacket() - with sock.makefile('wb') as buffer: - VarInt().write(len(packet.data), buffer) - packet.write(buffer) - - # Receive response - with sock.makefile('rb') as buffer: - length = VarInt().read(buffer) - rpacket = SLPCurrentResponsePacket.from_bytes(buffer.read(length)) - - # Ping-Pong - packet = SLPPingPacket(payload=int(time.time() * 1000)) - with sock.makefile('wb') as buffer: - VarInt().write(len(packet.data), buffer) - packet.write(buffer) - - with sock.makefile('rb') as buffer: - length = VarInt().read(buffer) - pong_packet = SLPPongPacket.from_bytes(buffer.read(length)) - - return { - 'protocol_version': int(rpacket.response['version']['protocol']), - 'version': rpacket.response['version']['name'], - 'motd': rpacket.response['description']['text'], - 'players': { - 'count': (int(rpacket.response['players']['online']), int(rpacket.response['players']['max'])), - 'list': rpacket.response['players'].get('sample', []) - }, - 'ping': int(time.time() * 1000) - pong_packet.payload - } - -def _slp_1_6(sock: socket.socket, hostname='localhost', port=25565): - # Send request - header = struct.pack('>BBB', 0xFE, 0x01, 0xFA) - command = struct.pack('>h22s', 11, 'MC|PingHost'.encode('utf-16be')) - encoded_hostname = hostname.encode('utf-16be') - rest = struct.pack('>hBh{}si'.format(len(encoded_hostname)), 7 + len(encoded_hostname), 80, len(hostname), encoded_hostname, port) - packet = header + command + rest - - sock.send(packet) - - with sock.makefile('rb') as fp: - data = fp.read() - protocol_ver, version, motd, nplayers, maxplayers = data[9:].decode('utf-16-be').split('\x00') - - return { - 'protocol_version': int(protocol_ver), - 'version': version, - 'motd': motd, - 'players': { - 'count': (int(nplayers), int(maxplayers)), - 'list': [] - }, - 'ping': None - } - -def _slp_prior_1_6(sock: socket.socket, **kwargs): - sock.send(struct.pack('>BB', 0xFE, 0x01)) - - with sock.makefile('rb') as fp: - data = fp.read() - protocol_ver, version, motd, nplayers, maxplayers = data[9:].decode('utf-16-be').split('\x00') - - return { - 'protocol_version': int(protocol_ver), - 'version': version, - 'motd': motd, - 'players': { - 'count': (int(nplayers), int(maxplayers)), - 'list': [] - }, - 'ping': None - } - -def _slp_prior_1_4(sock: socket.socket, **kwargs): - sock.send(struct.pack('>B', 0xFE)) - - with sock.makefile('rb') as fp: - data = fp.read() - motd, nplayers, maxplayers = data[3:].decode('utf-16-be').split('\xa7') - - return { - 'protocol_version': None, - 'version': None, - 'motd': motd, - 'players': { - 'count': (int(nplayers), int(maxplayers)), - 'list': [] - }, - 'ping': None - } - - -def ping(host: str, port: int): - result = None - _versions = [_slp_current, _slp_1_6, _slp_prior_1_6, _slp_prior_1_4] - - while len(_versions) > 0: - _slp = _versions.pop(0) - - try: - # Try a slp version - with socket.socket() as sock: - sock.connect((host, port)) - sock.settimeout(2) - _slp_result = _slp(sock, hostname=host, port=port) - - # Check if version worked - if _slp_result: - result = _slp_result - break - except: - pass - - return result diff --git a/mojang/api/session.py b/mojang/api/session.py deleted file mode 100644 index 26151acc..00000000 --- a/mojang/api/session.py +++ /dev/null @@ -1,71 +0,0 @@ -import datetime as dt - -import requests - -from ..error.exceptions import * -from ..utils import web -from ..utils.skin import Skin -from .urls import (CHANGE_NAME, CHECK_NAME_CHANGE, GET_PROFILE, RESET_SKIN, UPLOAD_SKIN) - - -def get_user_name_change(access_token: str) -> dict: - """Return if user can change name and when it was created - - Args: - access_token (str): The session's access token - - Returns: - A dict with the following keys: `created_at` and `name_change_allowed` - - Raises: - Unauthorized: If the access token is invalid - """ - data = web.auth_request('get', CHECK_NAME_CHANGE, access_token, exceptions=(PayloadError, Unauthorized)) - return { - 'created_at': dt.datetime.strptime(data['createdAt'], '%Y-%m-%dT%H:%M:%SZ'), - 'name_change_allowed': data['nameChangeAllowed'] - } - -def change_user_name(access_token: str, name: str): - """Change the user name - - Args: - access_token (str): The session's access token - name (str): The new user name - - Raises: - Unauthorized: If the access token is invalid - InvalidName: If the new user name is invalid - UnavailableName: If the new user name is unavailable - """ - web.auth_request('put', CHANGE_NAME.format(name=name), access_token, exceptions=(InvalidName, UnavailableName, Unauthorized)) - -def change_user_skin(access_token: str, path: str, variant='classic'): - """Change user skin - - Args: - access_token (str): The session's access token - path (str): The the path to the new skin, either local or remote - variant (str, optional): The skin variant, either `classic` or `slim` - - Raises: - Unauthorized: If the access token is invalid - """ - skin = Skin(path, variant=variant) - files = [ - ('variant', skin.variant), - ('file', (f'image.{skin.extention}', skin.data, f'image/{skin.extention}')) - ] - web.auth_request('post', UPLOAD_SKIN, access_token, exceptions=(PayloadError, Unauthorized), files=files, headers={'content-type': None}) - -def reset_user_skin(access_token: str, uuid: str): - """Reset user skin - - Args: - access_token (str): The session's access token - uuid (str): The user uuid - - Raises: - Unauthorized: If the access token is invalid - """ - web.auth_request('delete', RESET_SKIN.format(uuid=uuid), access_token, exceptions=(PayloadError, Unauthorized)) diff --git a/mojang/api/urls.py b/mojang/api/urls.py deleted file mode 100644 index 210da3b8..00000000 --- a/mojang/api/urls.py +++ /dev/null @@ -1,42 +0,0 @@ -from ..utils.web import URL - -# Base urls -MOJANG_AUTHSERVER = URL('https://authserver.mojang.com') -MOJANG_API = URL('https://api.mojang.com') -MOJANG_STATUS = URL('https://status.mojang.com') -MOJANG_SESSION = URL('https://sessionserver.mojang.com') -MINECRAFT_SERVICES = URL('https://api.minecraftservices.com') -LAUNCHER_META = URL('https://launchermeta.mojang.com') -RESOURCES = URL('http://resources.download.minecraft.net') - -# Game Files -MC_VERSIONS = LAUNCHER_META.join('mc/game/version_manifest.json') -RESOURCES_DOWNLOAD = RESOURCES.join('{}/{}') - -# Status check -STATUS_CHECK = MOJANG_STATUS.join('check') - -# Mojang authserver -AUTHENTICATE = MOJANG_AUTHSERVER.join('authenticate') -VALIDATE = MOJANG_AUTHSERVER.join('validate') -REFRESH = MOJANG_AUTHSERVER.join('refresh') -SIGNOUT = MOJANG_AUTHSERVER.join('signout') -INVALIDATE = MOJANG_AUTHSERVER.join('invalidate') - -# Security Check -SECURITY_CHECK = MOJANG_API.join('user/security/location') -SECURITY_CHALLENGES = MOJANG_API.join('user/security/challenges') - -# Other -NAME_HISTORY = MOJANG_API.join('user/profiles/{uuid}/names') -GET_UUID = MOJANG_API.join('users/profiles/minecraft/{name}') -GET_UUIDS = MOJANG_API.join('profiles/minecraft') - -CHECK_NAME_CHANGE = MINECRAFT_SERVICES.join('minecraft/profile/namechange') -CHANGE_NAME = MINECRAFT_SERVICES.join('minecraft/profile/name/{name}') - -UPLOAD_SKIN = MINECRAFT_SERVICES.join('minecraft/profile/skins') -RESET_SKIN = MOJANG_API.join('user/profile/{uuid}/skin') - -GET_PROFILE = MOJANG_SESSION.join('session/minecraft/profile/{uuid}') -GET_AUTH_PROFILE = MINECRAFT_SERVICES.join('minecraft/profile') diff --git a/mojang/main.py b/mojang/main.py deleted file mode 100644 index 2d6994a6..00000000 --- a/mojang/main.py +++ /dev/null @@ -1,99 +0,0 @@ -from .api import base -from .api.auth import yggdrasil -from .profile import UserProfile -from .session import UserSession - -from .api.files.versions import MinecraftVersions - -# Basic api -api_status = base.api_status -name_history = base.name_history - -def get_uuid(username: str) -> str: - """Get uuid of username - - Args: - username (str): The username which you want the uuid of - - Returns: - The uuid of the username - """ - return base.get_uuid(username) - -def get_uuids(usernames: list) -> list: - """Get uuid of multiple username - - Note: Limited Endpoint - The Mojang API only allow 10 usernames maximum, if more than 10 usernames are - given to the function, multiple request will be made. - - Args: - usernames (list): The list of username which you want the uuid of - - Returns: - A list of uuid - """ - return base.get_uuids(usernames) - -def get_username(uuid: str) -> str: - """Get the username of a uuid - - Args: - uuid (str): The uuid you want the username of - - Returns: - The username of the uuid - """ - profile = base.get_profile(uuid) - if profile: - return profile['name'] - - -# Connect -def connect(username: str, password: str, client_token: str = None) -> 'UserSession': - """Connect a user with name and password - - Args: - username (str): The username or email if account is not legacy - password (str): The user password - client_token (str, optional): The client token to use in the authentication (default to None) - - Returns: - A `UserSession` object - """ - auth_data = yggdrasil.authenticate_user(username, password, client_token=client_token) - return UserSession(auth_data['access_token'], auth_data['client_token']) - - -# Complete profile -def user(username: str = None, uuid: str = None) -> 'UserProfile': - """Fetch the complete profile by `uuid` or `username`. If both are given - the `username` is used - - Args: - username (str, optional): The username to fetch the profile for - uuid (str, optional): The uuid to fetch the profile for - - Returns: - A UserProfile object - """ - if username is not None: - user_data = base.get_uuid(username, only_uuid=False) - if user_data: - user_profile = base.get_profile(user_data['uuid']) - names = base.name_history(user_data['uuid']) - - return UserProfile.create(**{**user_data, **user_profile, 'names': names}) - elif uuid is not None: - user_profile = base.get_profile(uuid) - if user_profile: - user_data = base.get_uuid(user_profile['name'], only_uuid=False) - names = base.name_history(user_profile['uuid']) - - return UserProfile.create(**{**user_data, **user_profile, 'names': names}) - else: - raise Exception('You must at least provide one argument: `username` or `uuid`') - - -# Minecraft Versions -mcversions = MinecraftVersions() diff --git a/mojang/profile.py b/mojang/profile.py deleted file mode 100644 index 59ed45b2..00000000 --- a/mojang/profile.py +++ /dev/null @@ -1,49 +0,0 @@ - -class UserProfile: - """This class represent a Mojang user profile - - Attributes: - name (str): The player name - uuid (str): The player uuid - is_legacy (str): True if user has not migrated to mojang account - is_demo (str): True if user has not paid - names (str): The player name history - created_at (datetime): The player date of creation - name_change_allowed (str): True if player can change name - skins (str): The list of skins - capes (str): The list of capes - """ - - __slots__ = ('name', 'uuid', 'is_legacy', 'is_demo', 'names', 'created_at', 'name_change_allowed', 'skins', 'capes') - - @classmethod - def create(cls, **kwargs) -> 'UserProfile': - """Create a new UserProfile from a dict - - Args: - **kwargs (dict): The dict you want to create the profile from - - Returns: - A UserProfile with the attributes from the `kwargs` dict - """ - profile = UserProfile() - - for key, value in kwargs.items(): - if key in cls.__slots__: - super(cls, profile).__setattr__(key, value) - - return profile - - def __init__(self): - self.name = None - self.uuid = None - self.is_legacy = False - self.is_demo = False - - self.names = None - - self.created_at = None - self.name_change_allowed = None - - self.skins = [] - self.capes = [] diff --git a/mojang/session.py b/mojang/session.py deleted file mode 100644 index 2a3be86d..00000000 --- a/mojang/session.py +++ /dev/null @@ -1,106 +0,0 @@ -from .api import base, session -from .api.auth import security, yggdrasil -from .profile import UserProfile - - -class UserSession(UserProfile): - """This class represent a Mojang authenticated user - - Attributes: - name (str): The player name - uuid (str): The player uuid - is_legacy (str): True if user has not migrated to mojang account - is_demo (str): True if user has not paid - names (str): The player name history - created_at (datetime): The player date of creation - name_change_allowed (str): True if player can change name - skins (str): The list of skins - capes (str): The list of capes - """ - - def __init__(self, access_token: str, client_token: str): - super().__init__() - self.__access_token = access_token - self.__client_token = client_token - - self._refresh() - - def _refresh(self): - data = yggdrasil.refresh_token(self.__access_token, self.__client_token) - self.__access_token = data['access_token'] - self.__client_token = data['client_token'] - - self.uuid = data['uuid'] - self.name = data['name'] - self.is_legacy = data['legacy'] - self.is_demo = data['demo'] - - self._fetch_data() - - def _fetch_data(self): - # Get name history - self.names = base.name_history(self.uuid) - - # Check name change - data = session.get_user_name_change(self.__access_token) - self.name_change_allowed = data['name_change_allowed'] - self.created_at = data['created_at'] - - # Fetch skins/capes - data = base.get_profile(self.uuid) - # self.uuid = data['uuid'] - self.name = data['name'] - self.skins = data['skins'] - self.capes = data['capes'] - - def close(self): - """Close current session""" - yggdrasil.invalidate_token(self.__access_token, self.__client_token) - self.__access_token = None - self.__client_token = None - - # Security - @property - def secure(self) -> bool: - """Return if user IP is verified""" - return security.is_user_ip_secure(self.__access_token) - - @property - def challenges(self) -> list: - """Return the list of challenges to verify user IP""" - return security.get_user_challenges(self.__access_token) - - def verify(self, answers: list) -> bool: - """Verify user IP - - Args: - answers (list): The answers to verify the IP - """ - return security.verify_user_ip(self.__access_token, answers) - - # Name - def change_name(self, name: str): - """Change user name, and update profile data - - Args: - name (str): The new user name - """ - session.change_user_name(self.__access_token, name) - self._fetch_data() - - # Skin - def change_skin(self, path: str, variant='classic'): - """Change user skin, and update profile data - - Args: - path (str): The path to the skin - variant (str, optional): The skin variant, either `classic` or `slim` - """ - session.change_user_skin(self.__access_token, path, variant) - self._fetch_data() - - def reset_skin(self): - """Reset user skin, and update profile data""" - session.reset_user_skin(self.__access_token, self.uuid) - self._fetch_data() - diff --git a/mojang/utils/__init__.py b/mojang/utils/__init__.py deleted file mode 100644 index 49f4e587..00000000 --- a/mojang/utils/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .cape import Cape -from .skin import Skin -from .server import Server \ No newline at end of file diff --git a/mojang/utils/cape.py b/mojang/utils/cape.py deleted file mode 100644 index 6814a277..00000000 --- a/mojang/utils/cape.py +++ /dev/null @@ -1,5 +0,0 @@ -from .web import Downloadable - - -class Cape(Downloadable): - pass diff --git a/mojang/utils/server.py b/mojang/utils/server.py deleted file mode 100644 index b39b3b0f..00000000 --- a/mojang/utils/server.py +++ /dev/null @@ -1,45 +0,0 @@ -import socket -from ..api.net import slp -from ..api.net.query import QueryClient -from ..api.net.rcon import RconClient - -class Server: - - def __init__(self, hostname: str, port: int, rcon_port=None, rcon_password=None, query_port=None): - self.__hostname = hostname - self.__port = port - self.__rcon_client = None - self.__query_client = None - if rcon_port and rcon_password: - self.__rcon_client = RconClient(self.__hostname, rcon_port, rcon_password) - if query_port: - self.__query_client = QueryClient(self.__hostname, query_port) - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() - - def close(self): - if self.__rcon_client: - self.__rcon_client.close() - - # Rcon - def connect_rcon(self): - if self.__rcon_client: - self.__rcon_client.connect() - - def run_cmd(self, command: str): - if self.__rcon_client: - return self.__rcon_client.run_cmd(command) - - # Query - def get_stats(self, full=False): - if self.__query_client: - return self.__query_client.stats(full) - - # SLP - def ping(self): - return slp.ping(self.__hostname, self.__port) - diff --git a/mojang/utils/skin.py b/mojang/utils/skin.py deleted file mode 100644 index c637918f..00000000 --- a/mojang/utils/skin.py +++ /dev/null @@ -1,12 +0,0 @@ -from .web import Downloadable - - -class Skin(Downloadable): - - def __init__(self, source: str, variant: str='classic'): - super().__init__(source) - self.__variant = variant - - @property - def variant(self): - return self.__variant diff --git a/mojang/utils/web.py b/mojang/utils/web.py deleted file mode 100644 index 3ee7e339..00000000 --- a/mojang/utils/web.py +++ /dev/null @@ -1,130 +0,0 @@ -import re -from os import path -from urllib.parse import urljoin, urlparse - -import requests -import validators -from requests.auth import AuthBase - -from ..error.handler import handle_response - - -class URL: - - def __init__(self, root: str): - self.__root = root - - def join(self, *paths): - base = self.__root - for path in paths: - base = urljoin(base, path) - return base - - @property - def root(self): - return self.__root - - -# Download -class Downloadable: - - def __init__(self, source: str): - self.__source = source - self.__filename = [] - self.__data = None - - if validators.url(self.__source): - response = download_bytes(self.__source) - if response: - self.__filename, self.__data = response - else: - raise Exception('Invalid source') - elif path.exists(self.__source): - basename = path.basename(self.__source) - self.__filename = path.splitext(basename) - self.__filename[1] = self.__filename[1][1:] - with open(self.__source, 'rb') as fp: - self.__data = fp.read() - else: - raise Exception('Invalid source') - - @property - def source(self): - return self.__source - - @property - def filename(self): - return '.'.join(self.__filename) - - @property - def name(self): - return self.__filename[0] - - @property - def extension(self): - return self.__filename[1] - - @property - def data(self): - return self.__data - - @property - def size(self): - return len(self.__data) - - def save(self, dest: str): - file_path = dest - if path.isdir(dest): - file_path = path.join(dest, self.filename) - elif not dest.endswith(self.extension): - file_path = dest + f'.{self.extension}' - - with open(file_path, 'wb') as fp: - fp.write(self.data) - -# Auth -class BearerAuth(AuthBase): - - def __init__(self, token: str): - self.__token = token - - def __call__(self, r): - r.headers['Authorization'] = 'Bearer {}'.format(self.__token) - return r - -# Download -def filename_from_url(url: str): - url_path = urlparse(url).path - match = re.match('^([\w,\s-]+)\.([A-Za-z]{3})$', path.basename(url_path)) - if match: - return match.groups() - -def filename_from_headers(headers: dict): - # Check content-disposition - if 'content-disposition' in headers.keys(): - cdisp = headers['content-disposition'] - file_names = re.findall('filename=(.+)', cdisp) - if len(file_names) > 0: - return file_names[0][0], file_names[0][1][1:] - - # Check content-type - if 'content-type' in headers.keys(): - ctype = headers['content-type'] - if (not 'text' in ctype) and (not 'html' in ctype): - return ctype.split('/') - -def download_bytes(url: str): - response = requests.get(url) - if response.ok: - filename = filename_from_headers(response.headers) or filename_from_url(url) or ['download', None] - return filename, response.content - -# Request -def request(method: str, url: str, exceptions=[], **kwargs): - method_fct = getattr(requests, method) - response = method_fct(url, **kwargs) - data = handle_response(response, *exceptions) - return data - -def auth_request(method: str, url: str, token: str, exceptions=[], **kwargs): - return request(method, url, exceptions=exceptions, auth=BearerAuth(token), **kwargs)