Skip to content

Commit

Permalink
Upgrading tests (#126)
Browse files Browse the repository at this point in the history
* Small tests refactoring, cover vk.exceptions

* Tox fix

* Empty ACCESS_TOKEN not a token

* Cover vk.utils

* Refactoring and covering vk.session.InteractiveMixin

* Remove mocking requests objects and old vk.API (UserAPI) test
  • Loading branch information
YariKartoshe4ka authored Jun 5, 2022
1 parent ff2033b commit 6d094b5
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 270 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
push:

schedule:
- cron: "0 12 * * *"
- cron: "0 12 */2 * *"

jobs:
test:
Expand Down Expand Up @@ -38,6 +38,8 @@ jobs:
run: tox -vv --notest
- name: Run test suite
run: tox --skip-pkg-install
env:
VK_ACCESS_TOKEN: ${{ secrets.VK_ACCESS_TOKEN }}
- name: Upload coverage
uses: codecov/codecov-action@v2

Expand Down
34 changes: 18 additions & 16 deletions src/vk/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from enum import IntEnum

# API Error Codes
AUTHORIZATION_FAILED = 5 # Invalid access token
PERMISSION_IS_DENIED = 7
CAPTCHA_IS_NEEDED = 14
ACCESS_DENIED = 15 # No access to call this method
INVALID_USER_ID = 113 # User deactivated

class ErrorCodes(IntEnum):
"""VK API error codes"""

AUTHORIZATION_FAILED = 5 # Invalid access token
PERMISSION_IS_DENIED = 7
CAPTCHA_NEEDED = 14
ACCESS_DENIED = 15 # No access to call this method
INVALID_USER_ID = 113 # User deactivated


class VkException(Exception):
Expand All @@ -18,9 +22,6 @@ class VkAuthError(VkException):
class VkAPIError(VkException):
__slots__ = ['error', 'code', 'message', 'request_params', 'redirect_uri']

CAPTCHA_NEEDED = 14
ACCESS_DENIED = 15

def __init__(self, error_data):
super(VkAPIError, self).__init__()
self.error_data = error_data
Expand All @@ -31,15 +32,16 @@ def __init__(self, error_data):

@staticmethod
def get_pretty_request_params(error_data):
request_params = error_data.get('request_params', ())
request_params = {param['key']: param['value'] for param in request_params}
return request_params
return {
param['key']: param['value']
for param in error_data.get('request_params', ())
}

def is_access_token_incorrect(self):
return self.code == self.ACCESS_DENIED and 'access_token' in self.message
return self.code in (ErrorCodes.AUTHORIZATION_FAILED, ErrorCodes.ACCESS_DENIED)

def is_captcha_needed(self):
return self.code == self.CAPTCHA_NEEDED
return self.code == ErrorCodes.CAPTCHA_NEEDED

@property
def captcha_sid(self):
Expand All @@ -50,7 +52,7 @@ def captcha_img(self):
return self.error_data.get('captcha_img')

def __str__(self):
error_message = '{self.code}. {self.message}. request_params = {self.request_params}'.format(self=self)
error_message = f'{self.code}. {self.message}. request_params = {self.request_params}'
if self.redirect_uri:
error_message += ',\nredirect_uri = "{self.redirect_uri}"'.format(self=self)
error_message += f',\nredirect_uri = "{self.redirect_uri}"'
return error_message
134 changes: 69 additions & 65 deletions src/vk/session.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import getpass
import logging
import re
import urllib
from json import loads
from re import findall

import requests

from .api import APINamespace
from .exceptions import VkAPIError, VkAuthError
from .utils import json_iter_parse, stringify
from .utils import stringify

logger = logging.getLogger('vk')

Expand Down Expand Up @@ -47,27 +49,22 @@ def send(self, request):
# todo Replace with something less exceptional
response.raise_for_status()

# TODO: there are may be 2 dicts in one JSON
# for example: "{'error': ...}{'response': ...}"
for response_or_error in json_iter_parse(response.text):
request.response = response_or_error
response_or_error = loads(response.text)
request.response = response_or_error

if 'response' in response_or_error:
# todo Can we have error and response simultaneously
# for error in errors:
# logger.warning(str(error))
return response_or_error['response']
if 'response' in response_or_error:
# todo Can we have error and response simultaneously
# for error in errors:
# logger.warning(str(error))
return response_or_error['response']

elif 'error' in response_or_error:
api_error = VkAPIError(request.response['error'])
request.api_error = api_error
return self.handle_api_error(request)
elif 'error' in response_or_error:
api_error = VkAPIError(request.response['error'])
request.api_error = api_error
return self.handle_api_error(request)

def prepare_request(self, request):
request.method_params.setdefault('access_token', self.access_token)

def get_access_token(self):
raise NotImplementedError
def prepare_request(self, request): # noqa: U100
pass

def handle_api_error(self, request):
logger.error('Handle API error: %s', request.api_error)
Expand All @@ -77,31 +74,15 @@ def handle_api_error(self, request):

return api_error_handler(request)

def on_api_error_14(self, request):
"""
14. Captcha needed
"""
request.method_params['captcha_key'] = self.get_captcha_key(request)
request.method_params['captcha_sid'] = request.api_error.captcha_sid

return self.send(request)

def on_api_error_15(self, request):
"""
15. Access denied
- due to scope
"""
logger.error('Authorization failed. Access token will be dropped')

del request.method_params['access_token']
self.access_token = self.get_access_token()

return self.send(request)

def on_api_error(self, request):
logger.error('API error: %s', request.api_error)
raise request.api_error


class API(APIBase):
def __init__(self, access_token, **kwargs):
super().__init__(**kwargs)
self.access_token = access_token

def get_captcha_key(self, request):
"""
Default behavior on CAPTCHA is to raise exception
Expand All @@ -110,18 +91,24 @@ def get_captcha_key(self, request):
# request.api_error.captcha_img
raise request.api_error

def on_api_error_14(self, request):
"""
14. Captcha needed
"""
request.method_params['captcha_key'] = self.get_captcha_key(request)
request.method_params['captcha_sid'] = request.api_error.captcha_sid

class API(APIBase):
def __init__(self, access_token, **kwargs):
super().__init__(**kwargs)
self.access_token = access_token
return self.send(request)

def prepare_request(self, request):
request.method_params.setdefault('access_token', self.access_token)


class UserAPI(APIBase):
LOGIN_URL = 'https://m.vk.com'
AUTHORIZE_URL = 'https://oauth.vk.com/authorize'

def __init__(self, user_login='', user_password='', app_id=None, scope='offline', **kwargs):
def __init__(self, user_login=None, user_password=None, app_id=None, scope='offline', **kwargs):
super().__init__(**kwargs)

self.user_login = user_login
Expand All @@ -133,7 +120,7 @@ def __init__(self, user_login='', user_password='', app_id=None, scope='offline'

@staticmethod
def get_form_action(response):
form_action = re.findall(r'<form(?= ).* action="(.+)"', response.text)
form_action = findall(r'<form(?= ).* action="(.+)"', response.text)
if form_action:
return form_action[0]
else:
Expand Down Expand Up @@ -220,6 +207,18 @@ def process_auth_url_queries(self, url_queries):
self.user_id = url_queries.get('user_id')
return url_queries.get('access_token')

def on_api_error_15(self, request):
"""
15. Access denied
- due to scope
"""
logger.error('Authorization failed. Access token will be dropped')

del request.method_params['access_token']
self.access_token = self.get_access_token()

return self.send(request)


class CommunityAPI(UserAPI):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -254,34 +253,39 @@ def prepare_request(self, request):

class InteractiveMixin:

def get_user_login(self):
user_login = input('VK user login: ')
return user_login.strip()
def __setattr__(self, name, value):
if name in dir(self.__class__) and not value:
return

def get_user_password(self):
import getpass
object.__setattr__(self, name, value)

user_password = getpass.getpass('VK user password: ')
return user_password
@property
def user_login(self):
if not hasattr(self, '_cached_user_login'):
self._cached_user_login = input('VK user login: ')
return self._cached_user_login

def get_access_token(self):
logger.debug('InteractiveMixin.get_access_token()')
access_token = super().get_access_token()
if not access_token:
access_token = input('VK API access token: ')
return access_token
@property
def user_password(self):
if not hasattr(self, '_cached_user_password'):
self._cached_user_password = getpass.getpass('VK user password: ')
return self._cached_user_password

@property
def access_token(self):
if not hasattr(self, '_cached_access_token'):
self._cached_access_token = input('VK API access token: ')
return self._cached_access_token

def get_captcha_key(self, captcha_image_url):
"""
Read CAPTCHA key from shell
"""
print('Open CAPTCHA image url: ', captcha_image_url)
captcha_key = input('Enter CAPTCHA key: ')
return captcha_key
return input('Enter CAPTCHA key: ')

def get_auth_check_code(self):
"""
Read Auth code from shell
"""
auth_check_code = input('Auth check code: ')
return auth_check_code.strip()
return input('Auth check code: ')
30 changes: 1 addition & 29 deletions src/vk/utils.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,8 @@
import logging
from typing import Iterable

logger = logging.getLogger('vk')

STRING_LIKE_TYPES = (str, bytes, bytearray)


try:
import simplejson as json
except ImportError:
import json


def json_iter_parse(response_text):
decoder = json.JSONDecoder(strict=False)
idx = 0
while idx < len(response_text):
obj, idx = decoder.raw_decode(response_text, idx)
yield obj


def stringify(value):
if isinstance(value, Iterable) and not isinstance(value, STRING_LIKE_TYPES):
return ','.join(map(str, value))
Expand All @@ -30,18 +13,7 @@ def stringify_values(dictionary):
return {key: stringify(value) for key, value in dictionary.items()}


# class LoggingSession(requests.Session):
# def request(self, method, url, **kwargs):
# logger.debug('Request: %s %s, params=%r, data=%r', method, url, kwargs.get('params'), kwargs.get('data'))
# response = super(LoggingSession, self).request(method, url, **kwargs)
# logger.debug('Response: %s %s', response.status_code, response.url)
# return response


def censor_access_token(access_token):
if isinstance(access_token, str) and len(access_token) >= 12:
return '{}***{}'.format(access_token[:4], access_token[-4:])
elif access_token:
return '***'
else:
return access_token
return '***'
Loading

0 comments on commit 6d094b5

Please sign in to comment.