From ffff1a0394a561003de5269b94deb7ae010dc842 Mon Sep 17 00:00:00 2001 From: Arthur Williams Date: Tue, 14 May 2024 01:05:55 -0700 Subject: [PATCH] fixup! crunchroll: respond to crunchryoll site upgrade --- amt/servers/crunchyroll.py | 145 ++++++++++++++----------------------- 1 file changed, 53 insertions(+), 92 deletions(-) diff --git a/amt/servers/crunchyroll.py b/amt/servers/crunchyroll.py index d74c8d7..546fad3 100644 --- a/amt/servers/crunchyroll.py +++ b/amt/servers/crunchyroll.py @@ -17,33 +17,38 @@ class GenericCrunchyrollServer(Server): domain = "crunchyroll.com" base_url = f"https://www.{domain}" api_base = f"https://api.{domain}" - login_url = f"{api_base}/login.1.json" + token_url = f"{base_url}/auth/v1/token" + login_url = token_url crunchyroll_lock = RLock() - session_id_may_be_invalid = True - _client_id = ('cr_web', 'noaihdevm_6iyg0a8l0q') - _basic_auth = None _auth_headers = None _auth_refresh = 0 + _BASIC_AUTH = 'Basic ' + base64.b64encode(':'.join(( + 't-kdgp2h8c3jub8fn0fq', + 'yfLDfMfrYvKXh4JXS1LEI2cCqu1v5Wan', + )).encode()).decode() + @property def is_logged_in(self): - return bool(self.session_get_cookie("etp_rt")) - - def get_session_id(self, force=False): - query = { - 'sess_id': 1, - 'device_id': 'whatvalueshouldbeforweb', - 'device_type': 'com.crunchyroll.static', - 'access_token': 'giKq5eY27ny3cqz', - 'referer': f'{self.base_url}/welcome/login' - } - with GenericCrunchyrollServer.crunchyroll_lock: - upsell_response = self.session_get( - f'{self.api_base}/get_upsell_data.0.json?{urlencode(query)}') - print(upsell_response.json()) - return upsell_response.json()['data']['session_id'] + return bool(self.refresh_token) + + @property + def refresh_token(self): + return self.session_get_cookie("refresh_token") + + @refresh_token.setter + def refresh_token(self, value): + return self.session_set_cookie("refresh_token", value) + + @property + def is_premium(self): + return True + for premium_cookies_name in ["crplusctamembership", "premplusctav"]: + if self.session_get_cookie(premium_cookies_name): + return True + return False def session_get_json(self, url, mem_cache=False, skip_cache=True, **kwargs): self.update_auth() @@ -58,39 +63,42 @@ def get_auth_headers(self): self.update_auth() return GenericCrunchyrollServer._auth_headers + def set_auth_info(self, data): + GenericCrunchyrollServer._auth_headers = {"Authorization": data["token_type"] + " " + data["access_token"], "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.20 Safari/537.36"} + GenericCrunchyrollServer._auth_refresh = time.time() + data.get("expires_in", 300) - 10 + def update_auth(self): with GenericCrunchyrollServer.crunchyroll_lock: if GenericCrunchyrollServer._auth_headers and GenericCrunchyrollServer._auth_refresh > time.time(): return - if not GenericCrunchyrollServer._basic_auth: - cx_api_param = self._client_id[self.is_logged_in] - GenericCrunchyrollServer._basic_auth = 'Basic ' + base64.b64encode(f'{cx_api_param}:'.encode()).decode() - auth_headers = {'Authorization': GenericCrunchyrollServer._basic_auth} - if self.is_logged_in: - grant_type = 'etp_rt_cookie' + auth_headers = {"Authorization": GenericCrunchyrollServer._BASIC_AUTH} + if self.refresh_token: + data = { + "refresh_token": self.refresh_token, + "grant_type": "refresh_token", + "scope": "offline_access", + } else: - grant_type = 'client_id' - auth_headers['ETP-Anonymous-ID'] = uuid.uuid4() - - data = { - "grant_type": grant_type.encode(), - "device_type": "Firefox on Linux", - "device_id": "c6ea9e3d-bdce-41c5-95eb-cf951e9b5667" - } - auth_response = self.session_post(f'{self.base_url}/auth/v1/token', headers=auth_headers, data=data).json() - GenericCrunchyrollServer._auth_headers = {'Authorization': auth_response['token_type'] + ' ' + auth_response['access_token']} - GenericCrunchyrollServer._auth_refresh = time.time() + auth_response.get("expires_in", 300) - 10 + data = {"grant_type": "client_id"} + auth_headers["ETP-Anonymous-ID"] = str(uuid.uuid4()) + + auth_response = self.session_post(self.token_url, headers=auth_headers, data=data).json() + self.set_auth_info(auth_response) def login(self, username, password): - session_id = self.get_session_id(True) - self.session_post(self.login_url, - data={ - "account": username, - "password": password, - "session_id": session_id, - }) - self.update_auth() + # session_id = self.get_session_id(True) + r = self.session_post(self.login_url, + data={ + "username": username, + "password": password, + "grant_type": "password", + "scope": "offline_access", + }, + headers={'Authorization': self._BASIC_AUTH}) + data = r.json() + self.refresh_token = data["refresh_token"] + self.set_auth_info(data) return True @@ -120,50 +128,6 @@ def get_config(self): def get_api_domain(self): return self.get_config()['cxApiParams']['apiDomain'] - @property - def is_premium(self): - for premium_cookies_name in ["crplusctamembership", "premplusctav"]: - if self.session_get_cookie(premium_cookies_name): - return True - return False - - def needs_authentication(self): - return not self.session_get_cookie("etp_rt") - - def init_auth_headers(self): - if self.session_get_cookie("etp_rt"): - grant_type, key = 'etp_rt_cookie', 'accountAuthClientId' - else: - grant_type, key = 'client_id', 'anonClientId' - - config = self.get_config() - auth_token = 'Basic ' + str(base64.b64encode(('%s:' % config['cxApiParams'][key]).encode('ascii')), 'ascii') - headers = {'Authorization': auth_token, "Content-Type": "application/x-www-form-urlencoded"} - - auth_response = self.session_get_json(f'{self.get_api_domain()}/auth/v1/token', post=True, headers=headers, data=f'grant_type={grant_type}'.encode('ascii')) - return {'Authorization': auth_response['token_type'] + ' ' + auth_response['access_token']} - - def get_auth_headers(self): - if not self.auth_header: - self.auth_header = self.init_auth_headers() - return self.auth_header - - def _get_params(self): - policy_response = self.session_get_json(f'{self.get_api_domain()}/index/v2', headers=self.get_auth_headers()) - cms = policy_response.get('cms_web') - bucket = cms['bucket'] - params = { - 'Policy': cms['policy'], - 'Signature': cms['signature'], - 'Key-Pair-Id': cms['key_pair_id'] - } - return (bucket, params) - - def get_params(self): - if not self.params: - self.params = self._get_params() - return self.params - def get_media_list(self, **kwargs): return self.search_for_media(None, **kwargs) @@ -216,8 +180,6 @@ def update_media_data(self, media_data, **kwargs): self.update_chapter_data(media_data, id=chapter_id, number=chapter["episode_number"], title=chapter["title"], premium=chapter["is_premium_only"], special=chapter["is_clip"], alt_id=chapter["slug_title"]) def get_stream_urls(self, media_data=None, chapter_data=None): - bucket, params = self.get_params() - url = f"https://cr-play-service.prd.crunchyrollsvc.com/v1/{chapter_data['id']}/console/switch/play" data = self.session_get_json(url, key=url, mem_cache=True, headers=self.get_auth_headers()) @@ -230,10 +192,9 @@ def get_stream_urls(self, media_data=None, chapter_data=None): return map(lambda x: [x[1]], url_list) def get_subtitle_info(self, media_data, chapter_data): - bucket, params = self.get_params() - url = f"https://cr-play-service.prd.crunchyrollsvc.com/v1/{chapter_data['id']}/console/switch/play" data = self.session_get_json(url, key=url, mem_cache=True, headers=self.get_auth_headers()) + for subInfo in data["subtitles"].values(): yield subInfo["language"], subInfo["url"], subInfo["format"], False