From 32be4908446407c28a8b9dfd7c492e94541895dc Mon Sep 17 00:00:00 2001 From: Ashok Prajapat Date: Wed, 18 Dec 2024 07:35:27 +0000 Subject: [PATCH 1/7] download and process script modified and unit test script added --- scripts/world_bank/datasets/README.md | 58 ++++ scripts/world_bank/datasets/datasets.py | 252 +++++++++++------- .../expected_files/expected_output.csv | 3 + .../input_data/WorldBank_FINDEX_CSV.csv | 3 + .../test_data/input_data/sv_mapping.csv | 3 + scripts/world_bank/datasets/unittestscript.py | 97 +++++++ 6 files changed, 322 insertions(+), 94 deletions(-) create mode 100644 scripts/world_bank/datasets/README.md create mode 100644 scripts/world_bank/datasets/test_data/expected_files/expected_output.csv create mode 100644 scripts/world_bank/datasets/test_data/input_data/WorldBank_FINDEX_CSV.csv create mode 100644 scripts/world_bank/datasets/test_data/input_data/sv_mapping.csv create mode 100644 scripts/world_bank/datasets/unittestscript.py diff --git a/scripts/world_bank/datasets/README.md b/scripts/world_bank/datasets/README.md new file mode 100644 index 0000000000..430b17f2d6 --- /dev/null +++ b/scripts/world_bank/datasets/README.md @@ -0,0 +1,58 @@ +# World Bank Datasets +- The WorldBankDatasets contains data about multiple databases like World development Indicators,Jobs,Education Statistics +- source: https://data.worldbank.org + +- how to download data: Auto download of data by using python script(datasets.py). + +- type of place: Country. + +- statvars: All Type + +- years: 1960 to 2050 + +- copyright year: 2024 + +## Processes WB datasets. + +update september 2024: +To run all processing methods , please do not pass the mode +Run: python3 datasets.py + +Or If required to check issue in any individual process follow all the steps as below: +Supports the following tasks: + + +## fetching datasets +- fetch_datasets: Fetches WB dataset lists and resources and writes them to 'output/wb-datasets.csv' + +Run `python3 datasets.py --mode=fetch_datasets` + +## Downloadin the datasets +- download_datasets: Downloads datasets listed in 'output/wb-datasets.csv' to the 'output/downloads' folder. + +Run: `python3 datasets.py --mode=download_datasets` + +## Writing wb codes +- write_wb_codes: Extracts World Bank indicator codes (and related information) from files downloaded in the 'output/downloads' folder to 'output/wb-codes.csv'. + +It only operates on files that are named '*_CSV.zip'. + +Run: `python3 datasets.py --mode=write_wb_codes` + +## Loads The Stat vars +- load_stat_vars: Loads stat vars from a mapping file specified via the `stat_vars_file` flag. +- Use this for debugging to ensure that the mappings load correctly and fix any errors logged by this operation. + +Run: `python3 datasets.py --mode=load_stat_vars --stat_vars_file=/path/to/statvars.csv` + +- See `sample-svs.csv` for a sample mappings file. + +## Writing output files +- write_observations: Extracts observations from files downloaded in the 'output/downloads' folder and saves them to CSVs in the 'output/observations' folder. + +- The stat vars file to be used for mappings should be specified using the `stat_vars_file' flag. + +- It only operates on files that are named '*_CSV.zip'. + +Run: `python3 datasets.py --mode=write_observations --stat_vars_file=/path/to/statvars.csv` + diff --git a/scripts/world_bank/datasets/datasets.py b/scripts/world_bank/datasets/datasets.py index fe9d33563d..ba610f7578 100644 --- a/scripts/world_bank/datasets/datasets.py +++ b/scripts/world_bank/datasets/datasets.py @@ -13,6 +13,12 @@ # limitations under the License. """Processes WB datasets. +update september 2024: +To run all processing methods , please do not pass the mode +Run: python3 datasets.py + +Or If required to check issue in any individual process follow all the steps as below: + Supports the following tasks: ============================ @@ -41,7 +47,7 @@ Use this for debugging to ensure that the mappings load correctly and fix any errors logged by this operation. -Run: python3 datasets.py --mode=load_stat_vars --stat_vars_file=/path/to/sv_mappings.csv +Run: python3 datasets.py --mode=load_stat_vars --stat_vars_file=/path/to/statvars.csv See `sample-svs.csv` for a sample mappings file. @@ -53,7 +59,7 @@ It only operates on files that are named '*_CSV.zip'. -Run: python3 datasets.py --mode=write_observations --stat_vars_file=/path/to/sv_mappings.csv +Run: python3 datasets.py --mode=write_observations --stat_vars_file=/path/to/statvars.csv """ import requests @@ -66,11 +72,13 @@ import re import urllib3 from urllib3.util.ssl_ import create_urllib3_context +from urllib3.exceptions import HTTPError from absl import flags import zipfile import codecs from itertools import repeat from datetime import datetime +from retry import retry FLAGS = flags.FLAGS @@ -84,7 +92,7 @@ class Mode: flags.DEFINE_string( - 'mode', Mode.WRITE_OBSERVATIONS, + 'mode', None, f"Specify one of the following modes: {Mode.FETCH_DATASETS}, {Mode.DOWNLOAD_DATASETS}, {Mode.WRITE_WB_CODES}, {Mode.LOAD_STAT_VARS}, {Mode.WRITE_OBSERVATIONS}" ) @@ -111,7 +119,7 @@ class Mode: os.makedirs(DOWNLOADS_DIR, exist_ok=True) os.makedirs(OBSERVATIONS_DIR, exist_ok=True) -POOL_SIZE = max(2, multiprocessing.cpu_count() - 1) +POOL_SIZE = 3 #max(2, multiprocessing.cpu_count() - 1) DOWNLOADABLE_RESOURCE_TYPES = set(["Download", "Dataset"]) @@ -131,7 +139,7 @@ class Mode: def download_datasets(): '''Downloads dataset files. This is a very expensive operation so run it with care. It assumes that the datasets CSV is already available.''' - + logging.info('start download_datasets') with open(DATASETS_CSV_FILE_PATH, 'r') as f: csv_rows = list(csv.DictReader(f)) download_urls = [] @@ -139,11 +147,13 @@ def download_datasets(): download_url = csv_row.get(DATASET_DOWNLOAD_URL_COLUMN_NAME) if download_url: download_urls.append(download_url) - + try: with multiprocessing.Pool(POOL_SIZE) as pool: pool.starmap(download, zip(download_urls)) logging.info('# files downloaded: %s', len(download_urls)) + except Exception as e: + logging.error("Error downloading %s", exc_info=e) def download(url): @@ -154,18 +164,26 @@ def download(url): return logging.info('Downloading %s to file %s', url, file_path) + + # response = requests.get(url) + # Using urllib3 for downloading content to avoid SSL issue. + # See: https://github.com/urllib3/urllib3/issues/2653#issuecomment-1165418616 try: - # response = requests.get(url) - # Using urllib3 for downloading content to avoid SSL issue. - # See: https://github.com/urllib3/urllib3/issues/2653#issuecomment-1165418616 - with urllib3.PoolManager(ssl_context=ctx) as http: - response = http.request("GET", url) + response = download_retry(url) with open(file_path, 'wb') as f: f.write(response.data) except Exception as e: logging.error("Error downloading %s", url, exc_info=e) +@retry(tries=3, delay=2, backoff=2) +def download_retry(url): + with urllib3.PoolManager(ssl_context=ctx, timeout=90) as http: + logging.info('# retrying for url: %s', url) + response = http.request("GET", url) + return response + + def fetch_and_write_datasets_csv(): fetch_dataset_lists() fetch_dataset_views() @@ -201,7 +219,7 @@ def get_datasets_csv_rows(): return csv_rows -DATASET_URL_FIELDS = ['harvet_source', 'url', 'website_url'] +DATASET_URL_FIELDS = ['website_url', 'harvest_source', 'url'] # URLs with this pattern are downloadable only if the URL is trunctated until it. Probably a bug in WB APIs. VERSION_ID_PATTERN = '?versionId=' @@ -277,11 +295,15 @@ def load_json(url, params, response_file): return json.load(f) logging.info("Fetching url %s, params %s", url, params) - response = requests.get(url, params=params).json() - with open(response_file, 'w') as f: - logging.info('Writing response to file %s', response_file) - json.dump(response, f, indent=2) - return response + try: + response = requests.get(url, params=params) + with open(response_file, 'w') as f: + logging.info('Writing response to file %s', response_file) + json.dump(response.json(), f, indent=2) + return True + except Exception as e: + print(f"Http error {e}") + return None def load_json_file(json_file): @@ -351,7 +373,8 @@ def write_wb_codes(): def get_all_codes(): all_codes = {} for file_name in os.listdir(DOWNLOADS_DIR): - if file_name.endswith(CSV_ZIP_FILE_SUFFIX): + if file_name.endswith(CSV_ZIP_FILE_SUFFIX) or file_name.endswith( + '_csv.zip'): zip_file = f"{DOWNLOADS_DIR}/{file_name}" codes = get_codes_from_zip(zip_file) if codes: @@ -366,47 +389,55 @@ def get_all_codes(): def get_codes_from_zip(zip_file): - with zipfile.ZipFile(zip_file, 'r') as zip: - (_, series_file) = get_data_and_series_file_names(zip) - if series_file is None: - logging.warning('No series file found in ZIP file: %s', zip_file) - else: - with zip.open(series_file, 'r') as csv_file: - series_rows = sanitize_csv_rows( - list(csv.DictReader(codecs.iterdecode(csv_file, 'utf-8')))) - num_codes = len(series_rows) - logging.info('# code(s) in %s: %s', zip_file, num_codes) - if num_codes == 0: - return {} - - if series_rows[0].get(SERIES_CODE_KEY) is None: - logging.error('No series code found in %s, sample row: %s', - zip_file, series_rows[0]) - return {} - - codes = {} - for series_row in series_rows: - code = series_row.get(SERIES_CODE_KEY) - codes[code] = { - SERIES_CODE_KEY: - code, - INDICATOR_NAME_KEY: - series_row.get(INDICATOR_NAME_KEY), - NUM_DATASETS_KEY: - 1, - TOPIC_KEY: - series_row.get(TOPIC_KEY), - UNIT_OF_MEASURE_KEY: - series_row.get(UNIT_OF_MEASURE_KEY), - SHORT_DEFINITION_KEY: - series_row.get(SHORT_DEFINITION_KEY), - LONG_DEFINITION_KEY: - series_row.get(LONG_DEFINITION_KEY), - LICENSE_TYPE_KEY: - series_row.get(LICENSE_TYPE_KEY), - } - return codes - return {} + try: + with zipfile.ZipFile(zip_file, 'r') as zip: + (_, series_file) = get_data_and_series_file_names(zip) + if series_file is None: + logging.warning('No series file found in ZIP file: %s', + zip_file) + else: + with zip.open(series_file, 'r') as csv_file: + series_rows = sanitize_csv_rows( + list( + csv.DictReader(codecs.iterdecode(csv_file, + 'utf-8')))) + num_codes = len(series_rows) + logging.info('# code(s) in %s: %s', zip_file, num_codes) + if num_codes == 0: + return {} + + if series_rows[0].get(SERIES_CODE_KEY) is None: + logging.error( + 'No series code found in %s, sample row: %s', + zip_file, series_rows[0]) + return {} + + codes = {} + for series_row in series_rows: + code = series_row.get(SERIES_CODE_KEY) + codes[code] = { + SERIES_CODE_KEY: + code, + INDICATOR_NAME_KEY: + series_row.get(INDICATOR_NAME_KEY), + NUM_DATASETS_KEY: + 1, + TOPIC_KEY: + series_row.get(TOPIC_KEY), + UNIT_OF_MEASURE_KEY: + series_row.get(UNIT_OF_MEASURE_KEY), + SHORT_DEFINITION_KEY: + series_row.get(SHORT_DEFINITION_KEY), + LONG_DEFINITION_KEY: + series_row.get(LONG_DEFINITION_KEY), + LICENSE_TYPE_KEY: + series_row.get(LICENSE_TYPE_KEY), + } + return codes + return {} + except Exception as e: + print("There is some problem in processing the file", e, + "File name is:", zipfile) def write_csv(csv_file_path, csv_columns, csv_rows): @@ -426,17 +457,36 @@ def write_all_observations(stat_vars_file): zip_files = [] for file_name in os.listdir(DOWNLOADS_DIR): - if file_name.endswith(CSV_ZIP_FILE_SUFFIX): + if file_name.endswith(CSV_ZIP_FILE_SUFFIX) or file_name.endswith( + '_csv.zip'): zip_files.append(f"{DOWNLOADS_DIR}/{file_name}") with multiprocessing.Pool(POOL_SIZE) as pool: pool.starmap(write_observations_from_zip, zip(zip_files, repeat(svs))) + check_allFiles_processed() + end = datetime.now() logging.info('End: %s', end) logging.info('Duration: %s', str(end - start)) +def check_allFiles_processed(): + expected_files = [ + 'ASPIRE_CSV_obs.csv', 'DB_CSV_obs.csv', 'Economic_Fitness_CSV_obs.csv', + 'EdStats_CSV_obs.csv', 'FINDEX_CSV_obs.csv', 'GFDD_CSV_obs.csv', + 'GPFI_CSV_obs.csv', 'HCI_CSV_obs.csv', 'HEFPI_CSV_obs.csv', + 'IDA_CSV_obs.csv', 'MDG_CSV_obs.csv', 'PovStats_CSV_obs.csv', + 'SDG_CSV_obs.csv', 'SE4ALL_CSV_obs.csv', + 'Subnational-Population_CSV_obs.csv', 'Subnational-Poverty_CSV_obs.csv', + 'WWBI_CSV_obs.csv' + ] + expected_files = sorted(set(expected_files)) + actual_output_files = sorted(set(os.listdir(OBSERVATIONS_DIR))) + if actual_output_files != expected_files: + logging.fatal('actual output files are not equal to expected') + + def write_observations_from_zip(zip_file, svs): csv_rows = get_observations_from_zip(zip_file, svs) if len(csv_rows) == 0: @@ -453,27 +503,33 @@ def write_observations_from_zip(zip_file, svs): def get_observations_from_zip(zip_file, svs): - with zipfile.ZipFile(zip_file, 'r') as zip: - (data_file, _) = get_data_and_series_file_names(zip) - if data_file is None: - logging.warning('No data file found in ZIP file: %s', zip_file) - return [] - else: - # Use name of file (excluding the extension) as the measurement method - measurement_method = f"{WORLD_BANK_MEASUREMENT_METHOD_PREFIX}_{zip_file.split('/')[-1].split('.')[0]}" - with zip.open(data_file, 'r') as csv_file: - data_rows = sanitize_csv_rows( - list(csv.DictReader(codecs.iterdecode(csv_file, 'utf-8')))) - num_rows = len(data_rows) - logging.info('# data rows in %s: %s', zip_file, num_rows) - - obs_csv_rows = [] - for data_row in data_rows: - obs_csv_rows.extend( - get_observations_from_data_row(data_row, svs, - measurement_method)) - - return obs_csv_rows + try: + with zipfile.ZipFile(zip_file, 'r') as zip: + (data_file, _) = get_data_and_series_file_names(zip) + if data_file is None: + logging.warning('No data file found in ZIP file: %s', zip_file) + return [] + else: + # Use name of file (excluding the extension) as the measurement method + measurement_method = f"{WORLD_BANK_MEASUREMENT_METHOD_PREFIX}_{zip_file.split('/')[-1].split('.')[0]}" + with zip.open(data_file, 'r') as csv_file: + data_rows = sanitize_csv_rows( + list( + csv.DictReader(codecs.iterdecode(csv_file, + 'utf-8')))) + num_rows = len(data_rows) + logging.info('# data rows in %s: %s', zip_file, num_rows) + + obs_csv_rows = [] + for data_row in data_rows: + obs_csv_rows.extend( + get_observations_from_data_row( + data_row, svs, measurement_method)) + + return obs_csv_rows + except Exception as e: + print("There is problem while processing the zip file:", e) + return [] def get_observations_from_data_row(data_row, svs, measurement_method): @@ -571,19 +627,27 @@ def get_data_and_series_file_names(zip): def main(_): - match FLAGS.mode: - case Mode.FETCH_DATASETS: - download_datasets() - case Mode.DOWNLOAD_DATASETS: - fetch_and_write_datasets_csv() - case Mode.WRITE_WB_CODES: - write_wb_codes() - case Mode.LOAD_STAT_VARS: - load_stat_vars(FLAGS.stat_vars_file) - case Mode.WRITE_OBSERVATIONS: - write_all_observations(FLAGS.stat_vars_file) - case _: - logging.error('No mode specified.') + logging.info(FLAGS.mode) + if not FLAGS.mode: + fetch_and_write_datasets_csv() + download_datasets() + write_wb_codes() + load_stat_vars(FLAGS.stat_vars_file) + write_all_observations(FLAGS.stat_vars_file) + else: + match FLAGS.mode: + case Mode.FETCH_DATASETS: + fetch_and_write_datasets_csv() + case Mode.DOWNLOAD_DATASETS: + download_datasets() + case Mode.WRITE_WB_CODES: + write_wb_codes() + case Mode.LOAD_STAT_VARS: + load_stat_vars(FLAGS.stat_vars_file) + case Mode.WRITE_OBSERVATIONS: + write_all_observations(FLAGS.stat_vars_file) + case _: + logging.error('No mode specified.') if __name__ == '__main__': diff --git a/scripts/world_bank/datasets/test_data/expected_files/expected_output.csv b/scripts/world_bank/datasets/test_data/expected_files/expected_output.csv new file mode 100644 index 0000000000..665d299d59 --- /dev/null +++ b/scripts/world_bank/datasets/test_data/expected_files/expected_output.csv @@ -0,0 +1,3 @@ +indicatorcode,statvar,measurementmethod,observationabout,observationdate,observationvalue,unit +account.t.d,worldBank/account_t_d,WorldBank_FINDEX_CSV,dcid:country/ARB,2011,22.48, +account.t.d.1,worldBank/account_t_d_1,WorldBank_FINDEX_CSV,dcid:country/EAS,2011,57.5, \ No newline at end of file diff --git a/scripts/world_bank/datasets/test_data/input_data/WorldBank_FINDEX_CSV.csv b/scripts/world_bank/datasets/test_data/input_data/WorldBank_FINDEX_CSV.csv new file mode 100644 index 0000000000..e7df731d90 --- /dev/null +++ b/scripts/world_bank/datasets/test_data/input_data/WorldBank_FINDEX_CSV.csv @@ -0,0 +1,3 @@ +countryname,countrycode,indicatorname,indicatorcode,2011,2014,2017,2021,2022,, +Arab World,ARB,Account (% age 15+),account.t.d,22.48,30.47,37.23,40.21,,, +East Asia & Pacific,EAS,Mobile money account,account.t.d.1,57.5,70.24,71.17,81.38,,, \ No newline at end of file diff --git a/scripts/world_bank/datasets/test_data/input_data/sv_mapping.csv b/scripts/world_bank/datasets/test_data/input_data/sv_mapping.csv new file mode 100644 index 0000000000..5130e4ac14 --- /dev/null +++ b/scripts/world_bank/datasets/test_data/input_data/sv_mapping.csv @@ -0,0 +1,3 @@ +seriescode,unit,statvar +account.t.d,,worldBank/account_t_d +account.t.d.1,,worldBank/account_t_d_1 diff --git a/scripts/world_bank/datasets/unittestscript.py b/scripts/world_bank/datasets/unittestscript.py new file mode 100644 index 0000000000..02fb378984 --- /dev/null +++ b/scripts/world_bank/datasets/unittestscript.py @@ -0,0 +1,97 @@ +# # Copyright 2020 Google LLC +# # +# # Licensed under the Apache License, Version 2.0 (the "License"); +# # you may not use this file except in compliance with the License. +# # You may obtain a copy of the License at +# # +# # https://www.apache.org/licenses/LICENSE-2.0 +# # +# # Unless required by applicable law or agreed to in writing, software +# # distributed under the License is distributed on an "AS IS" BASIS, +# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# # See the License for the specific language governing permissions and +# # limitations under the License. + +import datasets +import unittest +import os +import pandas as pd +import csv +import re +import codecs +import csv + +MODULE_DIR = os.path.dirname(__file__) + +TEST_DATASET_DIR = os.path.join(MODULE_DIR, "test_data", "input_data") +EXPECTED_FILES_DIR = os.path.join(MODULE_DIR, "test_data", "expected_files") + + +def get_datarow_measurement_method(filename): + data_row = [] + with open(os.path.join(TEST_DATASET_DIR, filename), 'r') as file_content: + csv_reader = csv.DictReader(file_content) + for row in csv_reader: + data_row.append(row) + measurement_method = re.match(r"(.*)\.csv", filename).group(1) + return data_row, measurement_method + + +def get_sv_mapping(indicator_code): + for file in os.listdir(TEST_DATASET_DIR): + if file.endswith('_mapping.csv'): + with open(os.path.join(TEST_DATASET_DIR, file), + 'r') as file_content: + csv_reader = csv.DictReader(file_content) + for row in csv_reader: + series_code = row['seriescode'] + if series_code == indicator_code: + svs = {series_code: row} + return svs + + +def fetch_expected_output(indicator_code): + for file in os.listdir(EXPECTED_FILES_DIR): + if file.endswith('.csv'): + with open(os.path.join(EXPECTED_FILES_DIR, file), + 'r') as file_content: + csv_reader = csv.DictReader(file_content) + for row in csv_reader: + indicator = row['indicatorcode'] + if indicator == indicator_code: + expected_output = row + return expected_output + + +class TestMyFunction(unittest.TestCase): + + def get_required_data(self, filename, indicator_code): + data_row, measurement_method = get_datarow_measurement_method( + 'WorldBank_FINDEX_CSV.csv') + svs = get_sv_mapping(indicator_code) + expected_output = fetch_expected_output(indicator_code) + return data_row, measurement_method, svs, expected_output + + def test_input1(self): + data_row, measurement_method, svs, expected_output = self.get_required_data( + 'WorldBank_FINDEX_CSV.csv', + 'account.t.d', + ) + self.assertEqual( + datasets.get_observations_from_data_row(data_row[0], svs, + measurement_method)[0], + expected_output) + + def test_input2(self): + data_row, measurement_method, svs, expected_output = self.get_required_data( + 'WorldBank_FINDEX_CSV.csv', + 'account.t.d.1', + ) + self.assertEqual( + datasets.get_observations_from_data_row(data_row[1], svs, + measurement_method)[0], + expected_output) + + +if __name__ == '__main__': + unittest.main() From 78961403e95083bf34dd3abf1c4777e534163358 Mon Sep 17 00:00:00 2001 From: Ashok Prajapat Date: Wed, 18 Dec 2024 09:43:08 +0000 Subject: [PATCH 2/7] test script file name changed --- .../world_bank/datasets/{unittestscript.py => datasets_test.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename scripts/world_bank/datasets/{unittestscript.py => datasets_test.py} (100%) diff --git a/scripts/world_bank/datasets/unittestscript.py b/scripts/world_bank/datasets/datasets_test.py similarity index 100% rename from scripts/world_bank/datasets/unittestscript.py rename to scripts/world_bank/datasets/datasets_test.py From 00847539a5e25de735ff4ad2bb03a37972e9310f Mon Sep 17 00:00:00 2001 From: Ashok Prajapat Date: Thu, 19 Dec 2024 04:49:48 +0000 Subject: [PATCH 3/7] commit to check, is all checks passing or not --- scripts/world_bank/datasets/datasets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/world_bank/datasets/datasets.py b/scripts/world_bank/datasets/datasets.py index ba610f7578..299b1589de 100644 --- a/scripts/world_bank/datasets/datasets.py +++ b/scripts/world_bank/datasets/datasets.py @@ -162,7 +162,7 @@ def download(url): if os.path.exists(file_path): logging.info('Already downloaded %s to file %s', url, file_path) return - + print("just checking") logging.info('Downloading %s to file %s', url, file_path) # response = requests.get(url) From 9ff49ddd25e62b0a25710fd2d89729cfec614daa Mon Sep 17 00:00:00 2001 From: Ashok Prajapat Date: Fri, 20 Dec 2024 09:12:55 +0000 Subject: [PATCH 4/7] review comments resolved --- scripts/world_bank/datasets/datasets.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/scripts/world_bank/datasets/datasets.py b/scripts/world_bank/datasets/datasets.py index 299b1589de..01574a51db 100644 --- a/scripts/world_bank/datasets/datasets.py +++ b/scripts/world_bank/datasets/datasets.py @@ -152,6 +152,8 @@ def download_datasets(): pool.starmap(download, zip(download_urls)) logging.info('# files downloaded: %s', len(download_urls)) + # While downloading from source there is multiple files which may not be required so below exception can be ignored. + # Verifying if all the required files have been generated after writing the output files except Exception as e: logging.error("Error downloading %s", exc_info=e) @@ -162,7 +164,6 @@ def download(url): if os.path.exists(file_path): logging.info('Already downloaded %s to file %s', url, file_path) return - print("just checking") logging.info('Downloading %s to file %s', url, file_path) # response = requests.get(url) @@ -172,6 +173,7 @@ def download(url): response = download_retry(url) with open(file_path, 'wb') as f: f.write(response.data) + # After retrying for multiple times it will move to download next one, fatal not required. except Exception as e: logging.error("Error downloading %s", url, exc_info=e) @@ -302,7 +304,7 @@ def load_json(url, params, response_file): json.dump(response.json(), f, indent=2) return True except Exception as e: - print(f"Http error {e}") + logging.info("Http error %s",e) return None @@ -436,8 +438,7 @@ def get_codes_from_zip(zip_file): return codes return {} except Exception as e: - print("There is some problem in processing the file", e, - "File name is:", zipfile) + logging.info("There is some problem in processing the file %s File name is: %s",e,zipfile) def write_csv(csv_file_path, csv_columns, csv_rows): @@ -472,6 +473,7 @@ def write_all_observations(stat_vars_file): def check_allFiles_processed(): + # Verify below observation csv are getting generated or not expected_files = [ 'ASPIRE_CSV_obs.csv', 'DB_CSV_obs.csv', 'Economic_Fitness_CSV_obs.csv', 'EdStats_CSV_obs.csv', 'FINDEX_CSV_obs.csv', 'GFDD_CSV_obs.csv', @@ -483,6 +485,7 @@ def check_allFiles_processed(): ] expected_files = sorted(set(expected_files)) actual_output_files = sorted(set(os.listdir(OBSERVATIONS_DIR))) + # If actual processed files are not equal to expected files, raising fatal if actual_output_files != expected_files: logging.fatal('actual output files are not equal to expected') @@ -527,8 +530,9 @@ def get_observations_from_zip(zip_file, svs): data_row, svs, measurement_method)) return obs_csv_rows + # Exception can be ignored as there might be some corrupted zip files from source except Exception as e: - print("There is problem while processing the zip file:", e) + logging.info("There is problem while processing the zip file: %s",e) return [] From 4a448b8623e550110ad47df50415b4382ca255d3 Mon Sep 17 00:00:00 2001 From: Ashok Prajapat Date: Fri, 20 Dec 2024 09:24:25 +0000 Subject: [PATCH 5/7] lint fixed --- scripts/world_bank/datasets/datasets.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/scripts/world_bank/datasets/datasets.py b/scripts/world_bank/datasets/datasets.py index 01574a51db..0793fb4cbb 100644 --- a/scripts/world_bank/datasets/datasets.py +++ b/scripts/world_bank/datasets/datasets.py @@ -152,7 +152,7 @@ def download_datasets(): pool.starmap(download, zip(download_urls)) logging.info('# files downloaded: %s', len(download_urls)) - # While downloading from source there is multiple files which may not be required so below exception can be ignored. + # While downloading from source there is multiple files which may not be required so below exception can be ignored. # Verifying if all the required files have been generated after writing the output files except Exception as e: logging.error("Error downloading %s", exc_info=e) @@ -304,7 +304,7 @@ def load_json(url, params, response_file): json.dump(response.json(), f, indent=2) return True except Exception as e: - logging.info("Http error %s",e) + logging.info("Http error %s", e) return None @@ -438,7 +438,9 @@ def get_codes_from_zip(zip_file): return codes return {} except Exception as e: - logging.info("There is some problem in processing the file %s File name is: %s",e,zipfile) + logging.info( + "There is some problem in processing the file %s File name is: %s", + e, zipfile) def write_csv(csv_file_path, csv_columns, csv_rows): @@ -532,7 +534,7 @@ def get_observations_from_zip(zip_file, svs): return obs_csv_rows # Exception can be ignored as there might be some corrupted zip files from source except Exception as e: - logging.info("There is problem while processing the zip file: %s",e) + logging.info("There is problem while processing the zip file: %s", e) return [] From 0f3d2fe92754ab075240aad5cc474b7794e5390f Mon Sep 17 00:00:00 2001 From: Ashok Prajapat Date: Thu, 9 Jan 2025 04:52:09 +0000 Subject: [PATCH 6/7] Addressed all the comments given --- scripts/world_bank/datasets/datasets.py | 14 +++- scripts/world_bank/datasets/manifest.json | 83 +++++++++++++++++++++++ 2 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 scripts/world_bank/datasets/manifest.json diff --git a/scripts/world_bank/datasets/datasets.py b/scripts/world_bank/datasets/datasets.py index 0793fb4cbb..ecb3b44fdb 100644 --- a/scripts/world_bank/datasets/datasets.py +++ b/scripts/world_bank/datasets/datasets.py @@ -79,6 +79,8 @@ from itertools import repeat from datetime import datetime from retry import retry +import subprocess +import sys FLAGS = flags.FLAGS @@ -98,6 +100,7 @@ class Mode: flags.DEFINE_string('stat_vars_file', 'statvars.csv', 'Path to CSV file with Stat Var mappings.') +flags.DEFINE_string('output_dir', None, 'Path to output directory') ctx = create_urllib3_context() ctx.load_default_certs() @@ -452,7 +455,12 @@ def write_csv(csv_file_path, csv_columns, csv_rows): csv_writer.writerows(csv_rows) -def write_all_observations(stat_vars_file): +def write_all_observations(stat_vars_file, output_dir=None): + if output_dir: + os.makedirs(output_dir, exist_ok=True) + global OBSERVATIONS_DIR + OBSERVATIONS_DIR = output_dir + start = datetime.now() logging.info('Start: %s', start) @@ -639,7 +647,7 @@ def main(_): download_datasets() write_wb_codes() load_stat_vars(FLAGS.stat_vars_file) - write_all_observations(FLAGS.stat_vars_file) + write_all_observations(FLAGS.stat_vars_file, FLAGS.output_dir) else: match FLAGS.mode: case Mode.FETCH_DATASETS: @@ -651,7 +659,7 @@ def main(_): case Mode.LOAD_STAT_VARS: load_stat_vars(FLAGS.stat_vars_file) case Mode.WRITE_OBSERVATIONS: - write_all_observations(FLAGS.stat_vars_file) + write_all_observations(FLAGS.stat_vars_file, FLAGS.output_dir) case _: logging.error('No mode specified.') diff --git a/scripts/world_bank/datasets/manifest.json b/scripts/world_bank/datasets/manifest.json new file mode 100644 index 0000000000..e4b0d71567 --- /dev/null +++ b/scripts/world_bank/datasets/manifest.json @@ -0,0 +1,83 @@ +{ + "import_specifications": [ + { + "import_name": "WorldBankDatasets", + "curator_emails": ["prajapata@google.com"], + "provenance_url": "https://databank.worldbank.org/source/world-development-indicators", + "provenance_description": "The WorldBankDatasets contains data about multiple databases like World development Indicators,Jobs,Education Statistics", + "scripts": ["datasets.py"], + "import_inputs": [ + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/ASPIRE_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/DB_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/Economic_Fitness_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/EdStats_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/FINDEX_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/GFDD_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/GPFI_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/HCI_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/HEFPI_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/IDA_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/MDG_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/PovStats_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/SDG_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/SE4ALL_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/Subnational-Population_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/Subnational-Poverty_CSV_obs.csv" + }, + { + "template_mcf": "wb.tmcf", + "cleaned_csv": "output/observations/WWBI_CSV_obs.csv" + } + ], + "cron_schedule": "0 0 1 12 *" + } + ] + } + \ No newline at end of file From 1d5c68a85aa522ad6287564d541c5d96b9238b8f Mon Sep 17 00:00:00 2001 From: Ashok Prajapat Date: Fri, 10 Jan 2025 11:27:12 +0000 Subject: [PATCH 7/7] curator_emails set to blank --- scripts/world_bank/datasets/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/world_bank/datasets/manifest.json b/scripts/world_bank/datasets/manifest.json index e4b0d71567..056d8f0eab 100644 --- a/scripts/world_bank/datasets/manifest.json +++ b/scripts/world_bank/datasets/manifest.json @@ -2,7 +2,7 @@ "import_specifications": [ { "import_name": "WorldBankDatasets", - "curator_emails": ["prajapata@google.com"], + "curator_emails": [], "provenance_url": "https://databank.worldbank.org/source/world-development-indicators", "provenance_description": "The WorldBankDatasets contains data about multiple databases like World development Indicators,Jobs,Education Statistics", "scripts": ["datasets.py"],