diff --git a/pyrcs/utils.py b/pyrcs/utils.py index 2173966..c5367a3 100644 --- a/pyrcs/utils.py +++ b/pyrcs/utils.py @@ -2,115 +2,85 @@ import collections import datetime -import numbers import os -import pickle import re import urllib.parse import bs4 import dateutil.parser +import fake_useragent import measurement.measures import numpy as np import pandas as pd import pkg_resources -import pyhelpers.ops -import pyhelpers.store -import rapidjson import requests +from pyhelpers.ops import confirmed +from pyhelpers.store import load_json, save_json -# ==================================================================================================================== -""" Change directory """ +# -- Directory --------------------------------------------------------------- -# Change directory to "dat" and sub-directories -def cd_dat(*sub_dir, dat_dir="dat", mkdir=False) -> str: +def cd_dat(*sub_dir, dat_dir="dat", mkdir=False, **kwargs): """ - :param sub_dir: [str] - :param dat_dir: [str] (default: "dat") - :param mkdir: [bool] (default: False) - :return: [str] - """ - path = pkg_resources.resource_filename(__name__, dat_dir) - for x in sub_dir: - path = os.path.join(path, x) - if mkdir: - os.makedirs(path, exist_ok=True) - return path - + Change directory to `dat_dir/` and sub-directories within a package. -# ==================================================================================================================== -""" Save data """ + :param sub_dir: name of directory; names of directories (and/or a filename) + :type sub_dir: str + :param dat_dir: name of a directory to store data, defaults to ``"dat"`` + :type dat_dir: str + :param mkdir: whether to create a directory, defaults to ``False`` + :type mkdir: bool + :param kwargs: optional parameters of `os.makedirs`_, e.g. ``mode=0o777`` + :return: a full path to a directory (or a file) under ``data_dir`` + :rtype: str + .. _`os.makedirs`: https://docs.python.org/3/library/os.html#os.makedirs -# Save Pickle file -def save_pickle(pickle_data, path_to_pickle, verbose=True): - """ - :param pickle_data: any object that could be dumped by the 'pickle' package - :param path_to_pickle: [str] local file path - :param verbose: [bool] (default: True) - :return: whether the data has been successfully saved - """ - pickle_filename = os.path.basename(path_to_pickle) - pickle_dir = os.path.basename(os.path.dirname(path_to_pickle)) - pickle_dir_parent = os.path.basename(os.path.dirname(os.path.dirname(path_to_pickle))) + **Example**:: - if verbose: - print("{} \"{}\" ... ".format("Updating" if os.path.isfile(path_to_pickle) else "Saving", - " - ".join([pickle_dir_parent, pickle_dir, pickle_filename])), end="") + from pyrcs.utils import cd_dat - try: - os.makedirs(os.path.dirname(os.path.abspath(path_to_pickle)), exist_ok=True) - pickle_out = open(path_to_pickle, 'wb') - pickle.dump(pickle_data, pickle_out) - pickle_out.close() - print("Successfully.") if verbose else None - except Exception as e: - print("Failed. {}.".format(e)) + dat_dir = "dat" + mkdir = False - -# Save JSON file -def save_json(json_data, path_to_json, verbose=True): - """ - :param json_data: any object that could be dumped by the 'json' package - :param path_to_json: [str] local file path - :param verbose: [bool] (default: True) - :return: whether the data has been successfully saved + cd_dat("line-data", dat_dir=dat_dir, mkdir=mkdir) + # "\\dat\\line-data" """ - json_filename = os.path.basename(path_to_json) - json_dir = os.path.basename(os.path.dirname(path_to_json)) - json_dir_parent = os.path.basename(os.path.dirname(os.path.dirname(path_to_json))) - - print("{} \"{}\" ... ".format("Updating" if os.path.isfile(path_to_json) else "Saving", - " - ".join([json_dir_parent, json_dir, json_filename])), end="") if verbose else None - try: - os.makedirs(os.path.dirname(os.path.abspath(path_to_json)), exist_ok=True) - json_out = open(path_to_json, 'w') - rapidjson.dump(json_data, json_out) - json_out.close() - print("Successfully.") if verbose else None - except Exception as e: - print("Failed. {}.".format(e)) + path = pkg_resources.resource_filename(__name__, dat_dir) + for x in sub_dir: + path = os.path.join(path, x) + if mkdir: + path_to_file, ext = os.path.splitext(path) + if ext == '': + os.makedirs(path_to_file, exist_ok=True, **kwargs) + else: + os.makedirs(os.path.dirname(path_to_file), exist_ok=True, **kwargs) + return path -# ==================================================================================================================== -""" Converter """ +# -- Converters -------------------------------------------------------------- -# Convert "miles.chains" to Network Rail mileages -def mile_chain_to_nr_mileage(miles_chains) -> str: +def mile_chain_to_nr_mileage(miles_chains): """ - :param miles_chains: [str; np.nan; None] 'miles.chains' - :return: [str] 'miles.yards' + Convert mileage data in the form '.' to Network Rail mileage. - Note on the 'ELRs and mileages' web page that 'mileages' are given in the form 'miles.chains'. + :param miles_chains: mileage data presented in the form '.' + :type miles_chains: str, numpy.nan, None + :return: Network Rail mileage in the form '.' + :rtype: str + + **Examples**:: + + from pyrcs.utils import mile_chain_to_nr_mileage - Testing e.g. miles_chains = '0.18' # AAM 0.18 Tewkesbury Junction with ANZ (84.62) mile_chain_to_nr_mileage(miles_chains) # '0.0396' - miles_chains = '' # np.nan # None + + miles_chains = None # or np.nan, or '' mile_chain_to_nr_mileage(miles_chains) # '' """ + if pd.notna(miles_chains) and miles_chains != '': miles, chains = str(miles_chains).split('.') yards = measurement.measures.Distance(chain=chains).yd @@ -120,20 +90,26 @@ def mile_chain_to_nr_mileage(miles_chains) -> str: return network_rail_mileage -# Convert Network Rail mileages to "miles.chains" -def nr_mileage_to_mile_chain(str_mileage) -> str: +def nr_mileage_to_mile_chain(str_mileage): """ - :param str_mileage: [str; np.nan; None] 'miles.yards' - :return: [str] 'miles.chains' + Convert Network Rail mileage to the form '.'. - Note on the 'ELRs and mileages' web page that 'mileages' are given in the form 'miles.chains'. + :param str_mileage: Network Rail mileage data presented in the form '.' + :type str_mileage: str, numpy.nan, None + :return: '.' + :rtype: str + + **Examples**:: + + from pyrcs.utils import nr_mileage_to_mile_chain - Testing e.g. str_mileage = '0.0396' nr_mileage_to_mile_chain(str_mileage) # '0.18' - str_mileage = '' # np.nan # None + + str_mileage = None # or np.nan, or '' nr_mileage_to_mile_chain(str_mileage) # '' """ + if pd.notna(str_mileage) and str_mileage != '': miles, yards = str(str_mileage).split('.') chains = measurement.measures.Distance(yard=yards).chain @@ -143,41 +119,75 @@ def nr_mileage_to_mile_chain(str_mileage) -> str: return miles_chains -# Convert str type Network Rail mileage to numerical type -def nr_mileage_str_to_num(str_mileage: str) -> float: +def nr_mileage_str_to_num(str_mileage): """ - Testing e.g. + Convert string-type Network Rail mileage to numerical-type one. + + :param str_mileage: string-type Network Rail mileage in the form '.' + :type str_mileage: str + :return: numerical-type Network Rail mileage + :rtype: float + + **Examples**:: + + from pyrcs.utils import nr_mileage_str_to_num + str_mileage = '0.0396' nr_mileage_str_to_num(str_mileage) # 0.0396 + str_mileage = '' - nr_mileage_str_to_num(str_mileage) # np.nan + nr_mileage_str_to_num(str_mileage) # nan """ + num_mileage = np.nan if str_mileage == '' else round(float(str_mileage), 4) return num_mileage -# Convert Network Rail mileage to str type -def nr_mileage_num_to_str(num_mileage: float) -> str: +def nr_mileage_num_to_str(num_mileage): """ - Testing e.g. + Convert numerical-type Network Rail mileage to string-type one. + + :param num_mileage: numerical-type Network Rail mileage + :type num_mileage: float + :return: string-type Network Rail mileage in the form '.' + :rtype: str + + **Examples**:: + + import numpy as np + from pyrcs.utils import nr_mileage_num_to_str + num_mileage = 0.0396 nr_mileage_num_to_str(num_mileage) # '0.0396' + num_mileage = np.nan nr_mileage_num_to_str(num_mileage) # '' """ + nr_mileage = '%.4f' % round(float(num_mileage), 4) if num_mileage and pd.notna(num_mileage) else '' return nr_mileage -# Convert Network Rail mileages to yards -def nr_mileage_to_yards(nr_mileage: (float, str)) -> int: +def nr_mileage_to_yards(nr_mileage): """ - Testing e.g. + Convert Network Rail mileages to yards. + + :param nr_mileage: Network Rail mileage + :type nr_mileage: float, str + :return: yards + :rtype: int + + **Examples**:: + + from pyrcs.utils import nr_mileage_to_yards + nr_mileage = '0.0396' nr_mileage_to_yards(nr_mileage) # 396 + nr_mileage = 0.0396 nr_mileage_to_yards(nr_mileage) # 396 """ + if isinstance(nr_mileage, (float, np.float, int, np.integer)): nr_mileage = nr_mileage_num_to_str(nr_mileage) else: @@ -188,17 +198,29 @@ def nr_mileage_to_yards(nr_mileage: (float, str)) -> int: return yards -# Convert yards to Network Rail mileages -def yards_to_nr_mileage(yards: (int, float, np.nan)) -> str: +def yards_to_nr_mileage(yards): """ - Testing e.g. + Convert yards to Network Rail mileages. + + :param yards: yards + :type yards: int, float, numpy.nan, None + :return: Network Rail mileage in the form '.' + :rtype: str + + **Examples**:: + + from pyrcs.utils import yards_to_nr_mileage + yards = 396 yards_to_nr_mileage(yards) # '0.0396' + yards = 396.0 yards_to_nr_mileage(yards) # '0.0396' - yards = None # np.nan + + yards = None yards_to_nr_mileage(yards) # '' """ + if pd.notnull(yards) and yards != '': mileage_mi = np.floor(measurement.measures.Distance(yd=yards).mi) mileage_yd = yards - int(measurement.measures.Distance(mi=mileage_mi).yd) @@ -209,56 +231,95 @@ def yards_to_nr_mileage(yards: (int, float, np.nan)) -> str: return mileage -# For a location x where (start_mileage_num == end_mileage_num), consider a section [x - shift_yards, x + shift_yards] -def shift_num_nr_mileage(nr_mileage: (float, int, str), shift_yards: (int, float)) -> float: +def shift_num_nr_mileage(nr_mileage, shift_yards): """ - :param nr_mileage: [float] - :param shift_yards: [int] - :return: [float] + Shift Network Rail mileage by given yards. - Testing e.g. - nr_mileage = '0.0396' # 0.0396 # 10 - shift_yards = 220 # 220.99 - shift_num_mileage(nr_mileage, shift_yards) # 0.0616 # 0.0617 + :param nr_mileage: Network Rail mileage + :type nr_mileage: float, int, str + :param shift_yards: yards by which the given ``nr_mileage`` is shifted + :type shift_yards: int, float + :return: shifted numerical Network Rail mileage + :rtype: float + + **Examples**:: + + from pyrcs.utils import shift_num_nr_mileage + + nr_mileage = '0.0396' # or 0.0396 + shift_yards = 220 + shift_num_nr_mileage(nr_mileage, shift_yards) # 0.0616 + + nr_mileage = '0.0396' + shift_yards = 220.99 + shift_num_nr_mileage(nr_mileage, shift_yards) # 0.0617 + + nr_mileage = 10 + shift_yards = 220 + shift_num_nr_mileage(nr_mileage, shift_yards) # 10.022 """ + yards = nr_mileage_to_yards(nr_mileage) + shift_yards shifted_nr_mileage = yards_to_nr_mileage(yards) shifted_num_mileage = nr_mileage_str_to_num(shifted_nr_mileage) return shifted_num_mileage -# Convert calendar year to Network Rail financial year -def year_to_financial_year(date: datetime.datetime) -> int: +def year_to_financial_year(date): """ - Testing e.g. + Convert calendar year of a given date to Network Rail financial year. + + :param date: date + :type date: datetime.datetime + :return: Network Rail financial year of the given ``date`` + :rtype: int + + **Example**:: + + from pyrcs.utils import year_to_financial_year + date = datetime.datetime.now() - year_to_financial_year(date) + + year_to_financial_year(date) # 2020 """ + financial_date = date + pd.DateOffset(months=-3) return financial_date.year -# ==================================================================================================================== -""" Parser """ - +# -- Parsers ----------------------------------------------------------------- -# Get a list of parsed HTML tr's -def parse_tr(header, trs) -> list: +def parse_tr(header, trs): """ - :param header: [list] list of column names of a requested table - :param trs: [bs4.ResultSet - list of bs4.Tag] contents under 'tr' tags of the web page - :return: [list] list of lists each comprising a row of the requested table + Parse a list of parsed HTML elements. + + .. _parse-tr: + + See also [`PT-1 `_]. - Get a list of parsed contents of tr-tag's, each of which corresponds to a piece of record - Reference: https://stackoverflow.com/questions/28763891/ + :param header: list of column names of a requested table + :type header: list + :param trs: contents under tags of a web page + :type trs: bs4.ResultSet - list of bs4.Tag + :return: list of lists with each comprising a row of the requested table + :rtype: list - Testing e.g. - source = requests.get('http://www.railwaycodes.org.uk/tunnels/tunnels1.shtm') + **Example**:: + + import bs4 + import fake_useragent + from pyrcs.utils import parse_tr + + source = requests.get( + 'http://www.railwaycodes.org.uk/elrs/elra.shtm', + headers={'User-Agent': fake_useragent.UserAgent().random}) parsed_text = bs4.BeautifulSoup(source.text, 'lxml') header = [x.text for x in parsed_text.find_all('th')] # Column names - trs = parsed_text.find_all('table', attrs={'width': '1100px'})[1].find_all('tr') - parse_tr(header, trs) + trs = parsed_text.find_all('tr') + + parse_tr(header, trs) # returns a list of lists """ + tbl_lst = [] for row in trs: data = [] @@ -316,19 +377,33 @@ def parse_tr(header, trs) -> list: return tbl_lst -# Parse the acquired list to make it be ready for creating the DataFrame -def parse_table(source, parser='lxml') -> tuple: +def parse_table(source, parser='lxml'): """ - :param source: [requests.Response] response object to connecting a URL to request a table - :param parser: [str] (default: 'lxml'; alternatives: 'html5lib', 'html.parser') - :return [tuple] ([list] of lists each comprising a row of the requested table - (see also parse_trs()) - [list] of column names of the requested table) + Parse HTML elements for creating a data frame. + + :param source: response object to connecting a URL to request a table + :type source: requests.Response + :param parser: ``'lxml'`` (default), ``'html5lib'`` or ``'html.parser'`` + :type parser: str + :return: + - a list of lists each comprising a row of the requested table (see also :ref:`parse_tr() `) and + - a list of column names of the requested table + :rtype: tuple - Testing e.g. - source = requests.get('http://www.railwaycodes.org.uk/tunnels/tunnels1.shtm') + **Examples**:: + + import bs4 + import fake_useragent + from pyrcs.utils import parse_table + + source = requests.get( + 'http://www.railwaycodes.org.uk/elrs/elra.shtm', + headers={'User-Agent': fake_useragent.UserAgent().random}) parser = 'lxml' - parse_table(source, parser='lxml') + + parse_table(source, parser) """ + # Get plain text from the source URL web_page_text = source.text # (If source.status_code == 200, the requested URL is available.) # Parse the text @@ -344,98 +419,155 @@ def parse_table(source, parser='lxml') -> tuple: return parse_tr(header, trs), header -# Parse location note -def parse_location_note(location_dat) -> tuple: +def parse_location_name(location_name): """ - :param location_dat: [str; None] - :return: [tuple] ([str] - Location name, [str] - Note) + Parse location name (and its associated note). + + :param location_name: location name (in raw data) + :type location_name: str, None + :return: location name and, if any, note + :rtype: tuple + + **Examples**:: + + from pyrcs.utils import parse_location_name - Testing e.g. location_dat = 'Abbey Wood' - parse_location_note(location_dat) + parse_location_name(location_dat) + # ('Abbey Wood', '') + + location_dat = None + parse_location_name(location_dat) + # ('', '') + location_dat = 'Abercynon (formerly Abercynon South)' - parse_location_note(location_dat) + parse_location_name(location_dat) + # ('Abercynon', 'formerly Abercynon South') + location_dat = 'Allerton (reopened as Liverpool South Parkway)' - parse_location_note(location_dat) + parse_location_name(location_dat) + # ('Allerton', 'reopened as Liverpool South Parkway') + location_dat = 'Ashford International [domestic portion]' - parse_location_note(location_dat) - """ - # Location name - d = re.search(r'.*(?= \[[\"\']\()', location_dat) - if d is not None: - dat = d.group() - elif ' [unknown feature, labelled "do not use"]' in location_dat: - dat = re.search(r'\w+(?= \[unknown feature, )', location_dat).group() - elif ') [formerly' in location_dat: - dat = re.search(r'.*(?= \[formerly)', location_dat).group() - else: - m_pattern = re.compile( - r'[Oo]riginally |[Ff]ormerly |[Ll]ater |[Pp]resumed | \(was | \(in | \(at | \(also |' - r' \(second code |\?|\n| \(\[\'| \(definition unknown\)| \(reopened |( portion])$') - x_tmp = re.search(r'(?=[\[(]).*(?<=[\])])|(?=\().*(?<=\) \[)', location_dat) - x_tmp = x_tmp.group() if x_tmp is not None else location_dat - dat = ' '.join(location_dat.replace(x_tmp, '').split()) if re.search(m_pattern, location_dat) else location_dat - - # Note - y = location_dat.replace(dat, '', 1).strip() - if y == '': - note = '' + parse_location_name(location_dat) + # ('Ashford International', 'domestic portion') + """ + + if location_name is None: + dat, note = '', '' + else: - n = re.search(r'(?<=[\[(])[\w ,?]+(?=[])])', y) - if n is None: - n = re.search(r'(?<=(\[[\'\"]\()|(\([\'\"]\[)|(\) \[)).*(?=(\)[\'\"]\])|(\][\'\"]\))|\])', y) - elif '"now deleted"' in y and y.startswith('(') and y.endswith(')'): - n = re.search(r'(?<=\().*(?=\))', y) - note = n.group() if n is not None else '' - if note.endswith('\'') or note.endswith('"'): - note = note[:-1] - - if 'STANOX ' in dat and 'STANOX ' in location_dat and note == '': - dat = location_dat[0:location_dat.find('STANOX')].strip() - note = location_dat[location_dat.find('STANOX'):] + # Location name + d = re.search(r'.*(?= \[[\"\']\()', location_name) + if d is not None: + dat = d.group() + elif ' [unknown feature, labelled "do not use"]' in location_name: + dat = re.search(r'\w+(?= \[unknown feature, )', location_name).group() + elif ') [formerly' in location_name: + dat = re.search(r'.*(?= \[formerly)', location_name).group() + else: + m_pattern = re.compile( + r'[Oo]riginally |[Ff]ormerly |[Ll]ater |[Pp]resumed | \(was | \(in | \(at | \(also |' + r' \(second code |\?|\n| \(\[\'| \(definition unknown\)| \(reopened |( portion])$') + x_tmp = re.search(r'(?=[\[(]).*(?<=[\])])|(?=\().*(?<=\) \[)', location_name) + x_tmp = x_tmp.group() if x_tmp is not None else location_name + if re.search(m_pattern, location_name): + dat = ' '.join(location_name.replace(x_tmp, '').split()) + else: + dat = location_name + + # Note + y = location_name.replace(dat, '', 1).strip() + if y == '': + note = '' + else: + n = re.search(r'(?<=[\[(])[\w ,?]+(?=[])])', y) + if n is None: + n = re.search(r'(?<=(\[[\'\"]\()|(\([\'\"]\[)|(\) \[)).*(?=(\)[\'\"]\])|(\][\'\"]\))|\])', y) + elif '"now deleted"' in y and y.startswith('(') and y.endswith(')'): + n = re.search(r'(?<=\().*(?=\))', y) + note = n.group() if n is not None else '' + if note.endswith('\'') or note.endswith('"'): + note = note[:-1] + + if 'STANOX ' in dat and 'STANOX ' in location_name and note == '': + dat = location_name[0:location_name.find('STANOX')].strip() + note = location_name[location_name.find('STANOX'):] return dat, note -# Parse date string -def parse_date(str_date, as_date_type=False) -> (str, datetime.date): +def parse_date(str_date, as_date_type=False): """ - :param str_date: [str] - :param as_date_type: [bool] (default: False) - :return: [str; datetime.date] the date formatted as needed + Parse a date. + + :param str_date: string-type date + :type str_date: str + :param as_date_type: whether to return the date as `datetime.date`_, defaults to ``False`` + :type as_date_type: bool + :return: parsed date as a string or `datetime.date`_ + :rtype: str, datetime.date + + .. _`datetime.date`: https://docs.python.org/3/library/datetime.html#datetime.date + + **Examples**:: + + from pyrcs.utils import parse_date + + str_date = '2020-01-01' - Testing e.g. - str_date = '2019-01-01' as_date_type = True - parse_date(str_date, as_date_type) + parse_date(str_date, as_date_type) # datetime.date(2020, 1, 1) """ - temp_date = dateutil.parser.parse(str_date, fuzzy=True) # datetime.strptime(last_update_date[12:], '%d %B %Y') + + temp_date = dateutil.parser.parse(str_date, fuzzy=True) + # or, temp_date = datetime.strptime(last_update_date[12:], '%d %B %Y') + parsed_date = temp_date.date() if as_date_type else str(temp_date.date()) - return parsed_date + return parsed_date -# ==================================================================================================================== -""" Get useful information """ +# -- Get useful information -------------------------------------------------- -# Get last update date -def get_last_updated_date(url, parsed=True, date_type=False) -> (str, None): +def get_last_updated_date(url, parsed=True, as_date_type=False): """ - :param url: [str] URL link of a requested web page - :param parsed: [bool] (default: True) indicator of whether to reformat the date - :param date_type: [bool] (default: False) - :return:[str; None] date of when the specified web page was last updated + Get last update date. + + :param url: URL link of a requested web page + :type url: str + :param parsed: whether to reformat the date, defaults to ``True`` + :type parsed: bool + :param as_date_type: whether to return the date as `datetime.date`_, defaults to ``False`` + :type as_date_type: bool + :return: date of when the specified web page was last updated + :rtype: str, datetime.date, None + + .. _`datetime.date`: https://docs.python.org/3/library/datetime.html#datetime.date + + **Examples**:: + + from pyrcs.utils import get_last_updated_date + + parsed = True + + url = 'http://www.railwaycodes.org.uk/crs/CRSa.shtm' - Testing e.g. - url = 'http://www.railwaycodes.org.uk/crs/CRSa.shtm' - url_ = 'http://www.railwaycodes.org.uk/linedatamenu.shtm' - parsed = True date_type = False get_last_updated_date(url, parsed, date_type) - get_last_updated_date(url_, parsed, date_type) # None + # '--' + + date_type = True + get_last_updated_date(url, parsed, date_type) + # datetime.date(, , ) + + url = 'http://www.railwaycodes.org.uk/linedatamenu.shtm' + get_last_updated_date(url, parsed, date_type) + # None """ + # Request to get connected to the given url - source = requests.get(url) + source = requests.get(url, headers={'User-Agent': fake_useragent.UserAgent().random}) web_page_text = source.text # Parse the text scraped from the requested web page parsed_text = bs4.BeautifulSoup(web_page_text, 'lxml') # (Alternative parsers: 'html5lib', 'html.parser') @@ -446,27 +578,41 @@ def get_last_updated_date(url, parsed=True, date_type=False) -> (str, None): # Decide whether to convert the date's format if parsed: # Convert the date to "yyyy-mm-dd" format - last_update_date = parse_date(last_update_date, date_type) + last_update_date = parse_date(last_update_date, as_date_type) else: last_update_date = None # print('Information not available.') return last_update_date -# Get the catalogue for a class -def get_catalogue(cls_url, navigation_bar_exists=True, menu_exists=True) -> dict: +def get_catalogue(main_url, navigation_bar_exists=True, menu_exists=True): """ - :param cls_url: [str] - :param navigation_bar_exists: [bool] (default: True) - :param menu_exists: [bool] (default: True) - :return: [dict] {[str] - title: [str] - URL} + Get the catalogue for a class. + + :param main_url: URL of the main page of a code category + :type main_url: str + :param navigation_bar_exists: whether a navigation bar exists on the web page, defaults to ``True`` + :type navigation_bar_exists: bool + :param menu_exists: whether a menu exists on the web page, defaults to ``True`` + :type menu_exists: bool + :return: {'': '<URL>'} + :rtype: dict - Testing e.g. - url = 'http://www.railwaycodes.org.uk/crs/CRS0.shtm' + **Examples**:: + + from pyrcs.utils import get_catalogue + + menu_exists = True + + main_url = 'http://www.railwaycodes.org.uk/elrs/elr0.shtm' navigation_bar_exists = True - menu_exists = True - get_catalogue(url, navigation_bar_exists, menu_exists) + get_catalogue(main_url, navigation_bar_exists, menu_exists) + + main_url = 'http://www.railwaycodes.org.uk/linedatamenu.shtm' + navigation_bar_exists = False + get_catalogue(main_url, navigation_bar_exists, menu_exists) """ - source = requests.get(cls_url) + + source = requests.get(main_url, headers={'User-Agent': fake_useragent.UserAgent().random}) if navigation_bar_exists: cold_soup = bs4.BeautifulSoup(source.text, 'lxml').find_all('a', text=True, attrs={'class': None}) @@ -483,21 +629,31 @@ def get_catalogue(cls_url, navigation_bar_exists=True, menu_exists=True) -> dict source.close() - raw_contents = [{x.text: urllib.parse.urljoin(os.path.dirname(cls_url) + '/', x['href'])} for x in hot_soup] + raw_contents = [{x.text: urllib.parse.urljoin(os.path.dirname(main_url) + '/', x['href'])} for x in hot_soup] contents = dict(e for d in raw_contents for e in d.items()) return contents -# Get a menu of the available classes -def get_cls_menu(cls_url: str) -> dict: +def get_category_menu(menu_url): """ - Testing e.g. - cls_url = 'http://www.railwaycodes.org.uk/linedatamenu.shtm' - get_cls_menu(cls_url) + Get a menu of the available classes. + + :param menu_url: URL of the menu page + :type menu_url: str + :return: {'<category name>': {'<title>': '<URL>'}} + :rtype: dict + + **Example**:: + + from pyrcs.utils import get_category_menu + + menu_url = 'http://www.railwaycodes.org.uk/linedatamenu.shtm' + get_category_menu(menu_url) """ - source = requests.get(cls_url) + + source = requests.get(menu_url, headers={'User-Agent': fake_useragent.UserAgent().random}) soup = bs4.BeautifulSoup(source.text, 'lxml') h1, h2s = soup.find('h1'), soup.find_all('h2') @@ -505,12 +661,12 @@ def get_cls_menu(cls_url: str) -> dict: cls_name = h1.text.replace(' menu', '') if len(h2s) == 0: - cls_elem = dict((x.text, urllib.parse.urljoin(cls_url, x.get('href'))) for x in h1.find_all_next('a')) + cls_elem = dict((x.text, urllib.parse.urljoin(menu_url, x.get('href'))) for x in h1.find_all_next('a')) else: all_next = [x.replace(':', '') for x in h1.find_all_next(string=True) if x != '\n' and x != '\xa0'][2:] h2s_list = [x.text.replace(':', '') for x in h2s] - all_next_a = [(x.text, urllib.parse.urljoin(cls_url, x.get('href'))) for x in h1.find_all_next('a', href=True)] + all_next_a = [(x.text, urllib.parse.urljoin(menu_url, x.get('href'))) for x in h1.find_all_next('a', href=True)] idx = [all_next.index(x) for x in h2s_list] for i in idx: @@ -521,9 +677,9 @@ def get_cls_menu(cls_url: str) -> dict: if i == 0: d = dict(all_next_a[i:idx[i]]) elif i < len(idx): - d = {h2s_list[i-1]: dict(all_next_a[idx[i-1]+1:idx[i]])} + d = {h2s_list[i - 1]: dict(all_next_a[idx[i - 1] + 1:idx[i]])} else: - d = {h2s_list[i-1]: dict(all_next_a[idx[i-1]+1:])} + d = {h2s_list[i - 1]: dict(all_next_a[idx[i - 1] + 1:])} i += 1 cls_elem.update(d) @@ -531,26 +687,37 @@ def get_cls_menu(cls_url: str) -> dict: return cls_menu -# ==================================================================================================================== -""" Rectification of location names """ +# -- Rectification of location names ----------------------------------------- - -# Create a dict for replace location names -def fetch_location_names_repl_dict(k=None, regex=False, as_dataframe=False) -> dict: +def fetch_location_names_repl_dict(k=None, regex=False, as_dataframe=False): """ - :param k: [str; None (default)] - :param regex: [bool] (default: False) - :param as_dataframe: [bool] (default: False) - :return: [dict] + Create a dictionary for rectifying location names. + + :param k: key of the created dictionary, defaults to ``None`` + :type k: str, int, float, bool, None + :param regex: whether to create a dictionary for replacement based on regular expressions, defaults to ``False`` + :type regex: bool + :param as_dataframe: whether to return the created dictionary as a pandas.DataFrame, defaults to ``False`` + :type as_dataframe: bool + :return: dictionary for rectifying location names + :rtype: dict, pandas.DataFrame - Testing e.g. - k = None - regex = False + **Examples**:: + + from pyrcs.utils import fetch_location_names_repl_dict + + k = None + regex = False as_dataframe = True fetch_location_names_repl_dict(k, regex, as_dataframe) + + regex = True + as_dataframe = False + fetch_location_names_repl_dict(k, regex, as_dataframe) """ + json_filename = "location-names-repl{}.json".format("" if not regex else "-regex") - location_name_repl_dict = pyhelpers.store.load_json(cd_dat(json_filename)) + location_name_repl_dict = load_json(cd_dat(json_filename)) if regex: location_name_repl_dict = {re.compile(k): v for k, v in location_name_repl_dict.items()} @@ -558,31 +725,40 @@ def fetch_location_names_repl_dict(k=None, regex=False, as_dataframe=False) -> d replacement_dict = {k: location_name_repl_dict} if k else location_name_repl_dict if as_dataframe: - replacement_dict = pd.DataFrame.from_dict(replacement_dict) + replacement_dict = pd.DataFrame.from_dict(replacement_dict, orient='index', columns=['new_value']) return replacement_dict -# Rectify location names def update_location_name_repl_dict(new_items, regex, verbose=False): """ - :param new_items: [dict] - :param regex: [bool] - :param verbose: [bool] (default: False) + Update the location-name replacement dictionary in the package data. + + :param new_items: new items to replace + :type new_items: dict + :param regex: whether this update is for regular-expression dictionary + :type regex: bool + :param verbose: whether to print relevant information in console as the function runs, defaults to ``False`` + :type verbose: bool + + **Example**: + + from pyrcs.utils import update_location_name_repl_dict + + verbose = True - Testing e.g. new_items = {} - regex = False - verbose = True + regex = False update_location_name_repl_dict(new_items, regex, verbose) """ + json_filename = "location-names-repl{}.json".format("" if not regex else "-regex") new_items_keys = list(new_items.keys()) - if pyhelpers.ops.confirmed("To update \"{}\" with {{\"{}\"... }}?".format(json_filename, new_items_keys[0])): + if confirmed("To update \"{}\" with {{\"{}\"... }}?".format(json_filename, new_items_keys[0])): path_to_json = cd_dat(json_filename) - location_name_repl_dict = pyhelpers.store.load_json(path_to_json) + location_name_repl_dict = load_json(path_to_json) if any(isinstance(k, re.Pattern) for k in new_items_keys): new_items = {k.pattern: v for k, v in new_items.items() if isinstance(k, re.Pattern)} @@ -592,17 +768,22 @@ def update_location_name_repl_dict(new_items, regex, verbose=False): save_json(location_name_repl_dict, path_to_json, verbose=verbose) -# ==================================================================================================================== -""" Fixers """ +# -- Fixers ------------------------------------------------------------------ +def fix_num_stanox(stanox_code): + """ + Fix 'STANOX' if it is loaded as numbers. -# Fix 'STANOX' if it is loaded as numbers -def fix_num_stanox(stanox_x): + :param stanox_code: STANOX code + :type stanox_code: str, int + :return: standard STANOX code + :rtype: str + """ - if isinstance(stanox_x, numbers.Number): - stanox_x = '' if pd.isna(stanox_x) else str(int(stanox_x)) + if isinstance(stanox_code, (int, float)): + stanox_code = '' if pd.isna(stanox_code) else str(int(stanox_code)) - if len(stanox_x) < 5 and stanox_x != '': - stanox_x = '0' * (5 - len(stanox_x)) + stanox_x + if len(stanox_code) < 5 and stanox_code != '': + stanox_code = '0' * (5 - len(stanox_code)) + stanox_code - return stanox_x + return stanox_code