Skip to content

Commit

Permalink
Add instantiate_requests_session()
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeqfu committed Sep 17, 2021
1 parent ff01488 commit 82a3079
Showing 1 changed file with 143 additions and 85 deletions.
228 changes: 143 additions & 85 deletions pyhelpers/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ def colour_bar_index(cmap, n_colours, labels=None, **kwargs):

def is_network_connected():
"""
Check if the current machine can connect to the Internet.
Check whether the current machine can connect to the Internet.
:return: whether the Internet connection is currently working
:rtype: bool
Expand All @@ -1078,7 +1078,7 @@ def is_network_connected():

def is_url_connectable(url):
"""
Check if the current machine can connect to a given URL.
Check whether the current machine can connect to a given URL.
:param url: a URL
:type url: str
Expand Down Expand Up @@ -1108,6 +1108,92 @@ def is_url_connectable(url):
return False


def is_downloadable(url):
"""
Check whether the url contain a downloadable resource.
:param url: a valid URL
:type url: str
**Example**::
>>> from pyhelpers.ops import is_downloadable
>>> logo_url = 'https://www.python.org/static/community_logos/python-logo-master-v3-TM.png'
>>> is_downloadable(logo_url)
True
>>> google_url = 'https://www.google.co.uk/'
>>> is_downloadable(google_url)
False
"""

h = requests.head(url, allow_redirects=True)

content_type = h.headers.get('content-type').lower()

if content_type.startswith('text/html'):
downloadable = False
else:
downloadable = True

return downloadable


def instantiate_requests_session(url, max_retries=5, backoff_factor=0.1, retry_status='default',
**kwargs):
"""
Instantiate a requests session.
:param url: a valid URL
:type url: str
:param max_retries: maximum number of retries, defaults to ``5``
:type max_retries: int
:param backoff_factor: ``backoff_factor`` of `urllib3.util.retry.Retry()`_, defaults to ``0.1``
:type backoff_factor: float
:param retry_status: a list of HTTP status codes that force to retry downloading,
inherited from ``status_forcelist`` of `urllib3.util.retry.Retry()`_;
when ``retry_status='default'``, the list defaults to ``[429, 500, 502, 503, 504]``
:param kwargs: optional parameters (except ``backoff_factor`` and ``status_forcelist``)
of `urllib3.util.retry.Retry()`_
:return: a requests session
:rtype: `requests.Session`_
.. _urllib3.util.retry.Retry():
https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry
.. _requests.Session:
https://2.python-requests.org/en/master/api/#request-sessions
**Example**::
>>> from pyhelpers.ops import instantiate_requests_session
>>> logo_url = 'https://www.python.org/static/community_logos/python-logo-master-v3-TM.png'
>>> s = instantiate_requests_session(logo_url)
>>> type(s)
requests.sessions.Session
"""

if retry_status == 'default':
codes_for_retries = [429, 500, 502, 503, 504]
else:
codes_for_retries = copy.copy(retry_status)

retries = urllib3.util.retry.Retry(
total=max_retries, backoff_factor=backoff_factor, status_forcelist=codes_for_retries,
**kwargs)

session = requests.Session()

session.mount(
prefix='https://' if url.startswith('https:') else 'http://',
adapter=requests.adapters.HTTPAdapter(max_retries=retries))

return session


def fake_requests_headers(randomized=False, **kwargs):
"""
Make a fake HTTP headers for `requests.get
Expand Down Expand Up @@ -1144,6 +1230,8 @@ def fake_requests_headers(randomized=False, **kwargs):
ua = fake_user_agent.random if randomized else fake_user_agent['google chrome']

except fake_useragent.FakeUserAgentError:
if len(kwargs) > 1:
kwargs.update({'use_cache_server': False, 'verify_ssl': False})
fake_user_agent = fake_useragent.UserAgent(**kwargs)

if randomized:
Expand All @@ -1156,40 +1244,43 @@ def fake_requests_headers(randomized=False, **kwargs):
return fake_headers


def is_downloadable(url):
"""
Check whether the url contain a downloadable resource (e.g. a file).
:param url: a valid URL
:type url: str
**Example**::
def _download_with_tqdm(response, path_to_file):
import tqdm

>>> from pyhelpers.ops import is_downloadable
>>> logo_url = 'https://www.python.org/static/community_logos/python-logo-master-v3-TM.png'
>>> is_downloadable(logo_url)
True
file_size = int(response.headers.get('content-length')) # Total size in bytes

>>> google_url = 'https://www.google.co.uk/'
>>> is_downloadable(google_url)
False
"""
block_size = 1024 * 1024
if file_size >= block_size:
chunk_size, unit = block_size, 'MB'
else:
chunk_size, unit = 1024, 'KB'
total_iter = file_size // chunk_size

h = requests.head(url, allow_redirects=True)
iterable = response.iter_content(chunk_size=chunk_size, decode_unicode=True)
progress = tqdm.tqdm(iterable=iterable, total=total_iter, unit=unit, unit_scale=True)

content_type = h.headers.get('content-type').lower()
f = open(path_to_file, mode='wb')
written = 0
for dat in progress:
if dat:
try:
f.write(dat)
except TypeError:
f.write(dat.encode())
f.flush()
progress.update(len(dat))
progress.refresh()
written = written + len(dat)

if content_type.startswith('text/html'):
downloadable = False
else:
downloadable = True
progress.close()
f.close()

return downloadable
if file_size != 0 and written != file_size:
print("ERROR! Something went wrong!")


def download_file_from_url(url, path_to_file, max_retries=5, random_header=True, verbose=False,
fh_args=None, **kwargs):
rsr_args=None, frh_args=None, **kwargs):
"""
Download an object available at a valid URL.
Expand All @@ -1202,15 +1293,19 @@ def download_file_from_url(url, path_to_file, max_retries=5, random_header=True,
:type url: str
:param path_to_file: a path where the downloaded object is saved as, or a filename
:type path_to_file: str
:param max_retries: maximum number of retries
:param max_retries: maximum number of retries, defaults to ``5``
:type max_retries: int
:param random_header: whether to go for a random agent, defaults to ``True``
:type random_header: bool
:param verbose: whether to print relevant information in console, defaults to ``False``
:type verbose: bool or int
:param fh_args: optional parameters used by
:param rsr_args: optional parameters used by
:py:func:`requests_session_response()<pyhelpers.ops.requests_session_response>`,
defaults to ``None``
:type rsr_args: dict or None
:param frh_args: optional parameters used by
:py:func:`fake_requests_headers()<pyhelpers.ops.fake_requests_headers>`, defaults to ``None``
:type fh_args: dict or None
:type frh_args: dict or None
:param kwargs: optional parameters of `requests.Session.get()`_
.. _requests.Session.get():
Expand Down Expand Up @@ -1258,76 +1353,39 @@ def download_file_from_url(url, path_to_file, max_retries=5, random_header=True,
# verbose = True
# fh_args = None

retries = urllib3.util.retry.Retry(
total=max_retries, backoff_factor=0.1, status_forcelist=[429, 500, 502, 503, 504])

session = requests.Session()
session.mount(
'https://' if url.startswith('https:') else 'http://',
requests.adapters.HTTPAdapter(max_retries=retries))
if rsr_args is None:
rsr_args = {}
session = instantiate_requests_session(url=url, max_retries=max_retries, **rsr_args)

if fh_args is None:
fh_args = {}
if frh_args is None:
frh_args = {}
try:
headers = fake_requests_headers(randomized=random_header, **fh_args)
except IndexError:
# Try again
headers = fake_requests_headers(randomized=random_header, **fh_args)
except fake_useragent.errors.FakeUserAgentError:
if len(fh_args) > 1:
fh_args.update({'use_cache_server': False, 'verify_ssl': False})
headers = fake_requests_headers(randomized=random_header, **fh_args)
fake_headers = fake_requests_headers(randomized=random_header, **frh_args)
except IndexError: # Try again
fake_headers = fake_requests_headers(randomized=random_header, **frh_args)

# Streaming, so we can iterate over the response
response = session.get(url, stream=True, headers=headers, **kwargs)
response = session.get(url, stream=True, headers=fake_headers, **kwargs)

path_to_dir = os.path.dirname(path_to_file)
if path_to_dir == "":
path_to_file = os.path.join(os.getcwd(), path_to_file)
path_to_file_ = os.path.join(os.getcwd(), path_to_file)
path_to_dir = os.path.dirname(path_to_file_)
else:
if not os.path.exists(path_to_dir):
os.makedirs(path_to_dir)
path_to_file_ = copy.copy(path_to_file)

if verbose:
import tqdm
if not os.path.exists(path_to_dir):
os.makedirs(path_to_dir)

file_size = int(response.headers.get('content-length')) # Total size in bytes

block_size = 1024 * 1024
if file_size >= block_size:
chunk_size, unit = block_size, 'MB'
else:
chunk_size, unit = 1024, 'KB'
total_iter = file_size // chunk_size

iterable = response.iter_content(chunk_size=chunk_size, decode_unicode=True)
progress = tqdm.tqdm(iterable=iterable, total=total_iter, unit=unit, unit_scale=True)

f = open(path_to_file, mode='wb')
written = 0
for dat in progress:
if dat:
try:
f.write(dat)
except TypeError:
f.write(dat.encode())
f.flush()
progress.update(len(dat))
progress.refresh()
written = written + len(dat)

progress.close()
f.close()

if file_size != 0 and written != file_size:
print("ERROR! Something went wrong!")
if verbose:
_download_with_tqdm(response=response, path_to_file=path_to_file_)

else:
f = open(path_to_file, mode='wb')
f = open(file=path_to_file_, mode='wb')
shutil.copyfileobj(response.raw, f)
f.close()

if os.stat(path_to_file).st_size == 0:
if os.stat(path=path_to_file_).st_size == 0:
print("ERROR! Something went wrong! Check if the URL is downloadable.")

response.close()

0 comments on commit 82a3079

Please sign in to comment.