From 0f6fb049e224398ae126ca2bf03cfeced8707480 Mon Sep 17 00:00:00 2001 From: mikeqfu Date: Fri, 12 Jun 2020 12:53:21 +0100 Subject: [PATCH] integrated download.py into ops.py --- pyhelpers/ops.py | 63 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/pyhelpers/ops.py b/pyhelpers/ops.py index 1d2b874..ad64531 100644 --- a/pyhelpers/ops.py +++ b/pyhelpers/ops.py @@ -5,11 +5,74 @@ import inspect import itertools import math +import os import re +import time import types import numpy as np import pandas as pd +import tqdm + + +def download_file_from_url(url, path_to_file, wait_to_retry=3600, **kwargs): + """ + Download an object available at the given ``url``. + See also [`DFFU-1 `_]. + + :param url: URL + :type url: str + :param path_to_file: a full path to which the downloaded object is saved as + :type path_to_file: str + :param wait_to_retry: a wait time to retry downloading, defaults to ``3600`` (in second) + :type wait_to_retry: int, float + :param kwargs: optional arguments of `io.open `_ + + :Example: + + .. code-block:: python + + from pyhelpers.dir import cd + + wait_to_retry = 3600 + + url = 'https://www.python.org/static/community_logos/python-logo-master-v3-TM.png' + path_to_file = cd("tests\\images", "python-logo.png") + + download_file_from_url(url, path_to_file, wait_to_retry) + """ + + import requests + import fake_useragent + + headers = {'User-Agent': fake_useragent.UserAgent().random} + resp = requests.get(url, stream=True, headers=headers) # Streaming, so we can iterate over the response + + if resp.status_code == 429: + time.sleep(wait_to_retry) + + total_size = int(resp.headers.get('content-length')) # Total size in bytes + block_size = 1024 * 1024 + wrote = 0 + + directory = os.path.dirname(path_to_file) + if directory == "": + path_to_file = os.path.join(os.getcwd(), path_to_file) + else: + if not os.path.exists(directory): + os.makedirs(directory) + + with open(path_to_file, mode='wb', **kwargs) as f: + for data in tqdm.tqdm(resp.iter_content(block_size, decode_unicode=True), + total=total_size // block_size, unit='MB'): + wrote = wrote + len(data) + f.write(data) + f.close() + + resp.close() + + if total_size != 0 and wrote != total_size: + print("ERROR, something went wrong!") def confirmed(prompt=None, resp=False, confirmation_required=True):