Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bls_jolts_refresh_import #1153

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
171,024 changes: 92,736 additions & 78,288 deletions scripts/us_bls/jolts/BLSJolts.csv

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this 4.83 MB file necessary for the import? if yes, please use Git LFS to check-in this file.

Copy link
Author

@Girish3320 Girish3320 Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File not needed, it's an output file. Removed output csv and mcf files.

Large diffs are not rendered by default.

1,682 changes: 1,009 additions & 673 deletions scripts/us_bls/jolts/BLSJolts_StatisticalVariables.mcf

Large diffs are not rendered by default.

95 changes: 95 additions & 0 deletions scripts/us_bls/jolts/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,96 @@
# Scripts for importing dataset from the Search Results U.S. Bureau of Labor Statistics (BLS) Job Openings and Labor Turnover Survey (JOLTS)

Data is fetched from various data sources within the BLS site
then combined and cleaned.

Statistical Variables are generated and cleaned CSV is output.

Download the requirements.txt via pip and execute the file with Python 3.

Dataset being processed: https://download.bls.gov/pub/time.series/jt/

JOLTS dataset contains both NAICS industry codes and BLS jolts aggregations.
# Existing NAICS Codes are mapped directly while
# custom JOLTS codes include a colon distinguishing their new name.
_CODE_MAPPINGS = {
'000000': '000000:Total nonfarm', # New Code
'100000': '10',
'110099': '110099:Mining and logging', # New Code
'230000': '23',
'300000': '300000:Manufacturing', # New Code
'320000': '320000:Durable goods manufacturing', # New Code
'340000': '340000:Nondurable goods manufacturing', # New Code
'400000': '400000:Trade, transportation, and utilities', # New Code
'420000': '42',
'440000': '44',
'480099': '480099:Transportation warehousing, and utilities', # New Code
'510000': '51',
'510099': '510099:Financial activities', # New Code
'520000': '52',
'530000': '53',
'540099': '540099:Professional and business services', # New Code
'600000': '600000:Education and health services', # New Code
'610000': '61',
'620000': '62',
'700000': '700000:Leisure and hospitality', # New Code
'710000': '71',
'720000': '72',
'810000': '81',
'900000': '900000:Government', # New Code
'910000': '910000:Federal', # New Code
'920000': '92',
'923000': '923000:State and local government education', # New Code
'929000':
'929000:State and local government excluding education' # New Code
}


"""Fetches and combines BLS Jolts data sources.

Downloads detailed series information from the entire JOLTS dataset.
Each of the files is read, combined into a single dataframe, and processed.

Returns:
jolts_df: The 6 job data categories by industry, year, and adjustment,
as a data frame.
schema_mapping: List of tuples that contains information for each dataset.
"""

Additional information about each dataframe.
1. Tuple Format: Statistical Variable name, Stat Var population,
2. Stat Var Job Change Type If Relevant, Dataframe for Stat Var.

"""Creates Statistical Variable nodes.

A new statistical industry is needed for each of the 6 job variables
and for every industry.
The industry codes may be either NAICS or BLS_JOLTS aggregations.
The schema_mapping is used for additional information for
each of the 6 job variables. These new variables are written
to the statistical variables mcf file.

Args:
jolts_df: The df of BLS Jolts data created by generate_cleaned_dataframe.
schema_mapping: The schema mapping created by generate_cleaned_dataframe.
"""
template_stat_var = """
Node: dcid:{STAT_CLASS}_NAICS{INDUSTRY}_{ADJUSTMENT}
typeOf: dcs:StatisticalVariable
populationType: {POPULATION}
jobChangeEvent: dcs:{JOB_CHANGE_EVENT}
statType: dcs:measuredValue
measuredProperty: dcs:count
measurementQualifier: {BLS_ADJUSTMENT}
naics: dcid:NAICS/{INDUSTRY}
"""

""" Executes the downloading, preprocessing, and outputting of
required MCF and CSV for JOLTS data.
"""

IMPORTANT:
If any new statistical variable comes in future , it is to be added to the dictionaries :
1. _dcid_map in map_config.py
2. _mcf_map in mcf_config.py


87 changes: 68 additions & 19 deletions scripts/us_bls/jolts/bls_jolts.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no logging module used in the script

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please implement the retry module to download all the files.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please user retry module to implement this. Please refer to https://pypi.org/project/retry/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, along with modes download or process.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please implement mode flag to control the execution flow of the script like download or process.

Copy link
Author

@Girish3320 Girish3320 Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@

Statistical Variables are generated and cleaned CSV is output.

Download the requirements_all.txt via pip and execute the file with Python 3.
Download the requirements.txt via pip and execute the file with Python 3.

Dataset being processed: https://download.bls.gov/pub/time.series/jt/
"""
import sys
import os
import textwrap
from absl import app
import pandas as pd
import requests
from map_config import _dcid_map
import fileinput

# JOLTS dataset contains both NAICS industry codes and BLS jolts aggregations.
# Existing NAICS Codes are mapped directly while
Expand Down Expand Up @@ -75,50 +80,78 @@ def generate_cleaned_dataframe():
"""
# Series descriptions are used for adjustment status and industry code.
exp_series_columns = [
'series_id', 'seasonal', 'industry_code', 'region_code',
'dataelement_code', 'ratelevel_code', 'footnote_codes', 'begin_year',
'begin_period', 'end_year', 'end_period'
'series_id', 'seasonal', 'industry_code', 'state_code', 'area_code',
'sizeclass_code', 'dataelement_code', 'ratelevel_code',
'footnote_codes', 'begin_year', 'begin_period', 'end_year', 'end_period'
]

header = {'User-Agent': 'support@datacommons.org'}

series_desc = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.series",
storage_options=header,
converters={'industry_code': str},
sep="\\s+")
sep="\\t")
series_desc.columns = exp_series_columns
series_desc["series_id"] = series_desc["series_id"].apply(
lambda x: x.strip())

series_desc.to_csv("jolts_input_jt_series.csv")
assert len(series_desc.columns) == len(exp_series_columns)
assert (series_desc.columns == exp_series_columns).all()
series_desc = series_desc.set_index("series_id")

# Download various series datapoints
#job_openings = pd.read_csv(

job_openings = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.2.JobOpenings",
storage_options=header,
sep="\\s+")
job_openings.to_csv("jolts_input_jt_job_openings.csv")

job_hires = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.3.Hires",
storage_options=header,
sep="\\s+")
job_hires.to_csv("jolts_input_jt_job_hires.csv")

total_seps = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.4.TotalSeparations", # pylint: disable=line-too-long
storage_options=header,
sep="\\s+")
total_seps.to_csv("jolts_input_jt_totlal_separations.csv")

total_quits = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.5.Quits",
storage_options=header,
sep="\\s+")
total_quits.to_csv("jolts_input_jt_total_quits.csv")

total_layoffs = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.6.LayoffsDischarges", # pylint: disable=line-too-long
storage_options=header,
sep="\\s+")
total_layoffs.to_csv("jolts_input_jt_total_layoffs.csv")

total_other_seps = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.7.OtherSeparations", # pylint: disable=line-too-long
storage_options=header,
sep="\\s+")
total_other_seps.to_csv("jolts_input_jt_total_other_separations.csv")

# Additional information about each dataframe.
# Tuple Format: Statistical Variable name, Stat Var population,
# Stat Var Job Change Type If Relevant, Dataframe for Stat Var.
schema_mapping = [
("NumJobOpening", "schema:JobPosting", "", job_openings),
("NumJobHire", "dcs:BLSWorker", "Hire", job_hires),
("NumSeparation", "dcs:BLSWorker", "Separation", total_seps),
("NumVoluntarySeparation", "dcs:BLSWorker", "VoluntarySeparation",
total_quits),
("NumInvoluntarySeparation", "dcs:BLSWorker", "InvoluntarySeparation",
total_layoffs),
("NumOtherSeparation", "dcs:BLSWorker", "OtherSeparation",
("Count_JobPosting", "schema:JobPosting", "", job_openings),
("Count_Worker_Hire", "dcs:BLSWorker", "Hire", job_hires),
("Count_Worker_Separation", "dcs:BLSWorker", "Separation", total_seps),
("Count_Worker_VoluntarySeparation", "dcs:BLSWorker",
"VoluntarySeparation", total_quits),
("Count_Worker_InvoluntarySeparation", "dcs:BLSWorker",
"InvoluntarySeparation", total_layoffs),
("Count_Worker_OtherSeparation", "dcs:BLSWorker", "OtherSeparation",
total_other_seps),
]
# Combine datasets into a single dataframe including origin of data.
Expand All @@ -135,7 +168,7 @@ def generate_cleaned_dataframe():
df.loc[:, 'statistical_variable'] = schema_name
df.loc[:, 'job_change_event'] = job_change_event
df.loc[:, 'population_type'] = population_type
jolts_df = jolts_df.append(df)
jolts_df = jolts_df._append(df)

# Drop non-monthly data and throw away slice.
jolts_df = jolts_df.query("period != 'M13'").copy()
Expand All @@ -149,15 +182,21 @@ def period_year_to_iso_8601(row):
jolts_df['Date'] = jolts_df.apply(period_year_to_iso_8601, axis=1)

# Add relevant columns from series information.
series_cols = ['industry_code', 'region_code', 'seasonal', 'ratelevel_code']
series_cols = [
'industry_code', 'state_code', 'seasonal', 'ratelevel_code',
'sizeclass_code'
]

jolts_df = jolts_df.merge(series_desc[series_cols],
left_on=["series_id"],
right_index=True)

jolts_df.to_csv("before_query.csv", index=False)
# Drop rate data, preliminary data, and non-national data.
jolts_df = jolts_df.query("ratelevel_code == 'L'")
jolts_df = jolts_df.query("footnote_codes != 'P'")
jolts_df = jolts_df.query("region_code == '00'")
jolts_df = jolts_df.query("state_code == '00'")
jolts_df = jolts_df.query('sizeclass_code == 0')
jolts_df.to_csv("after_query.csv", index=False)

# Map industries.
def jolts_code_map(row):
Expand All @@ -184,12 +223,15 @@ def row_to_stat_var(row):
seasonal_adjustment = row['seasonal_adjustment']

return (
f"dcs:{base_stat_var}_NAICS_{industry_code}_{seasonal_adjustment}")
f"dcs:{base_stat_var}_NAICS{industry_code}_{seasonal_adjustment}")

# Build map to Statistical Variable.
jolts_df['seasonal_adjustment'] = jolts_df['seasonal'].apply(
lambda adj: "Adjusted" if adj == "S" else "Unadjusted")
jolts_df['StatisticalVariable'] = jolts_df.apply(row_to_stat_var, axis=1)
for old, new in _dcid_map.items():
jolts_df['StatisticalVariable'] = jolts_df[
'StatisticalVariable'].str.replace(old, new, regex=False)
jolts_df['Value'] = jolts_df['value']

return jolts_df, schema_mapping
Expand All @@ -210,7 +252,7 @@ def create_statistical_variables(jolts_df, schema_mapping):
schema_mapping: The schema mapping created by generate_cleaned_dataframe.
"""
template_stat_var = """
Node: dcid:{STAT_CLASS}_NAICS_{INDUSTRY}_{ADJUSTMENT}
Node: dcid:{STAT_CLASS}_NAICS{INDUSTRY}_{ADJUSTMENT}
typeOf: dcs:StatisticalVariable
populationType: {POPULATION}
jobChangeEvent: dcs:{JOB_CHANGE_EVENT}
Expand All @@ -222,12 +264,19 @@ def create_statistical_variables(jolts_df, schema_mapping):
# Map industry and seasonal adjustment to statistical variable name.
adjustment_types = [("Adjusted", "dcs:BLSSeasonallyAdjusted"),
("Unadjusted", "dcs:BLSSeasonallyUnadjusted")]
# adjustment_types = [("Adjusted", "dcs:Adjusted"),
# ("Unadjusted", "dcs:Unadjusted")]

# Output the schema mapping to a new file.
with open("BLSJolts_StatisticalVariables.mcf", "w+", newline="") as f_out:
for schema_name, pop_type, job_change_event, _ in schema_mapping:
for industry_code in list(jolts_df['industry_code'].unique()):
for adjusted_dcid_map, adjusted_schema in adjustment_types:
if adjusted_schema == "dcs:BLSSeasonallyAdjusted":
adjusted_schema = "dcs:Adjusted"
else:
adjusted_schema = "dcs:Unadjusted"

# Create new schema object.
stat_var_schema = textwrap.dedent(template_stat_var)

Expand Down
29 changes: 13 additions & 16 deletions scripts/us_bls/jolts/manifest.json
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
{
"import_specifications": [
"import_specifications": [
{
"import_name": "BLS_JOLTS",
"curator_emails": ["koppad@google.com"],
"provenance_url": "https://www.bls.gov/jlt/",
"provenance_description": "U.S. Bureau of Labor Statistics data on job openings, hires, and separations.",
"scripts": ["bls_jolts.py"],
"import_inputs": [
{
"import_name": "BLS_JOLTS",
"curator_emails": [
"ianjc@google.com"
],
"provenance_url": "https://www.bls.gov/jlt/",
"provenance_description": "U.S. Bureau of Labor Statistics data on job openings, hires, and separations.",
"scripts": [
"bls_jolts.py"
],
"import_inputs": [
{
"template_mcf": "BLSJolts.tmcf",
"cleaned_csv": "BLSJolts.csv",
"node_mcf": "BLSJolts_StatisticalVariables.mcf"
}
],
"cron_schedule": "0 10 21 * *"
}
]
],
"cron_schedule": "0 10 21 * *"
}
]
}

Loading
Loading