Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] allow passing list of variables to [check_]aggregate[_region]() functions #306

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 63 additions & 33 deletions pyam/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,12 +768,12 @@ def normalize(self, inplace=False, **kwargs):
return ret

def aggregate(self, variable, components=None, method='sum', append=False):
"""Compute the aggregate of timeseries components or sub-categories
"""Aggregate timeseries components or sub-categories within each region

Parameters
----------
variable: str
variable for which the aggregate should be computed
variable: str or list of str
variable(s) for which the aggregate will be computed
components: list of str, default None
list of variables, defaults to all sub-categories of `variable`
method: func or str, default 'sum'
Expand All @@ -782,20 +782,43 @@ def aggregate(self, variable, components=None, method='sum', append=False):
append the aggregate timeseries to `data` and return None,
else return aggregate timeseries
"""
# default components to all variables one level below `variable`
components = components or self._variable_components(variable)
# list of variables require default components (no manual list)
if islistable(variable) and components is not None:
raise ValueError('aggregating by list of variables cannot use '
'custom components')

mapping = {}
msg = 'cannot aggregate variable `{}` because it has no components'
# if single variable
if isstr(variable):
# default components to all variables one level below `variable`
components = components or self._variable_components(variable)

if not len(components):
logger.info(msg.format(variable))
return

for c in components:
mapping[c] = variable

# else, use all variables one level below `variable` as components
else:
for v in variable if islistable(variable) else [variable]:
_components = self._variable_components(v)
if not len(_components):
logger.info(msg.format(v))

if not len(components):
msg = 'cannot aggregate variable `{}` because it has no components'
logger.info(msg.format(variable))
for c in _components:
mapping[c] = v

return

rows = self._apply_filters(variable=components)
_data = _aggregate(self.data[rows], 'variable', method)
# rename all components to `variable` and aggregate
_df = self.data[self._apply_filters(variable=mapping.keys())].copy()
_df['variable'].replace(mapping, inplace=True)
_data = _aggregate(_df, [], method)

# append to `self` or return as pd.Series
if append is True:
self.append(_data, variable=variable, inplace=True)
self.append(_data, inplace=True)
else:
return _data

Expand All @@ -805,8 +828,8 @@ def check_aggregate(self, variable, components=None, method='sum',

Parameters
----------
variable: str
variable to be checked for matching aggregation of sub-categories
variable: str or list of str
variable(s) checked for matching aggregation of sub-categories
components: list of str, default None
list of variables, defaults to all sub-categories of `variable`
method: func or str, default 'sum'
Expand All @@ -825,7 +848,7 @@ def check_aggregate(self, variable, components=None, method='sum',
# filter and groupby data, use `pd.Series.align` for matching index
rows = self._apply_filters(variable=variable)
df_variable, df_components = (
_aggregate(self.data[rows], 'variable', method)
_aggregate(self.data[rows], [], method)
.align(df_components)
)

Expand All @@ -840,18 +863,20 @@ def check_aggregate(self, variable, components=None, method='sum',
if exclude_on_fail:
self._exclude_on_fail(diff.index.droplevel([2, 3, 4]))

return IamDataFrame(diff, variable=variable).timeseries()
return IamDataFrame(diff).timeseries()

def aggregate_region(self, variable, region='World', subregions=None,
components=False, method='sum', weight=None,
append=False):
"""Compute the aggregate of timeseries over a number of regions
including variable components only defined at the `region` level
"""Aggregate a timeseries over a number of subregions

This function allows to add variable sub-categories that are only
defined at the `region` level by setting `components=True`

Parameters
----------
variable: str
variable for which the aggregate should be computed
variable: str or list of str
variable(s) for which the aggregate will be computed
region: str, default 'World'
dimension
subregions: list of str
Expand All @@ -870,6 +895,11 @@ def aggregate_region(self, variable, region='World', subregions=None,
append the aggregate timeseries to `data` and return None,
else return aggregate timeseries
"""
if not isstr(variable) and components is not False:
msg = 'aggregating by list of variables with components ' \
'is not supported'
raise ValueError(msg)

if weight is not None and components is not False:
msg = 'using weights and components in one operation not supported'
raise ValueError(msg)
Expand All @@ -886,10 +916,10 @@ def aggregate_region(self, variable, region='World', subregions=None,

# compute aggregate over all subregions
subregion_df = self.filter(region=subregions)
cols = ['region', 'variable']
rows = subregion_df._apply_filters(variable=variable)
if weight is None:
_data = _aggregate(subregion_df.data[rows], cols, method=method)
col = 'region'
_data = _aggregate(subregion_df.data[rows], col, method=method)
else:
weight_rows = subregion_df._apply_filters(variable=weight)
_data = _aggregate_weight(subregion_df.data[rows],
Expand All @@ -909,25 +939,26 @@ def aggregate_region(self, variable, region='World', subregions=None,
components = set(r_comps).difference(sr_comps)

if len(components):
# rename all components to `variable` and aggregate
rows = region_df._apply_filters(variable=components)
_data = _data.add(_aggregate(region_df.data[rows], cols),
fill_value=0)
_df = region_df.data[rows].copy()
_df['variable'] = variable
_data = _data.add(_aggregate(_df, 'region'), fill_value=0)

if append is True:
self.append(_data, region=region, variable=variable, inplace=True)
self.append(_data, region=region, inplace=True)
else:
return _data

def check_aggregate_region(self, variable, region='World', subregions=None,
components=False, method='sum', weight=None,
exclude_on_fail=False, **kwargs):
"""Check whether the region timeseries data match the aggregation
of components
"""Check whether a timeseries matches the aggregation across subregions

Parameters
----------
variable: str
variable to be checked for matching aggregation of subregions
variable: str or list of str
variable(s) to be checked for matching aggregation of subregions
region: str, default 'World'
region to be checked for matching aggregation of subregions
subregions: list of str
Expand Down Expand Up @@ -960,7 +991,7 @@ def check_aggregate_region(self, variable, region='World', subregions=None,
return

df_region, df_subregions = (
_aggregate(self.data[rows], ['region', 'variable'])
_aggregate(self.data[rows], 'region')
.align(df_subregions)
)

Expand All @@ -976,8 +1007,7 @@ def check_aggregate_region(self, variable, region='World', subregions=None,
if exclude_on_fail:
self._exclude_on_fail(diff.index.droplevel([2, 3]))

col_args = dict(region=region, variable=variable)
return IamDataFrame(diff, **col_args).timeseries()
return IamDataFrame(diff, region=region).timeseries()

def _all_other_regions(self, region, variable):
"""Return list of regions other than `region` containing `variable`"""
Expand Down
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


from datetime import datetime
from pyam import IamDataFrame
from pyam import IamDataFrame, IAMC_IDX


here = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -21,7 +21,7 @@
['model_a', 'scen_a', 'World', 'Primary Energy|Coal', 'EJ/y', 0.5, 3],
['model_a', 'scen_b', 'World', 'Primary Energy', 'EJ/y', 2, 7],
],
columns=['model', 'scenario', 'region', 'variable', 'unit', 2005, 2010],
columns=IAMC_IDX + [2005, 2010],
)


Expand Down Expand Up @@ -61,7 +61,7 @@
['MESSAGE-GLOBIOM', 'a_scenario', 'AFR', 'Primary Energy', 'EJ/y', 2, 7],
['MESSAGE-GLOBIOM', 'a_scenario', 'World', 'Primary Energy', 'EJ/y', 3, 13],
],
columns=['model', 'scenario', 'region', 'variable', 'unit', 2005, 2010],
columns=IAMC_IDX + [2005, 2010],
)


Expand Down
112 changes: 92 additions & 20 deletions tests/test_feature_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,41 @@
from conftest import TEST_DTS


LONG_IDX = IAMC_IDX + ['year']

PE_MAX_DF = pd.DataFrame([
['model_a', 'scen_a', 'World', 'Primary Energy', 'EJ/y', 2005, 7.0],
['model_a', 'scen_a', 'World', 'Primary Energy', 'EJ/y', 2010, 10.0],
['model_a', 'scen_a', 'reg_a', 'Primary Energy', 'EJ/y', 2005, 5.0],
['model_a', 'scen_a', 'reg_a', 'Primary Energy', 'EJ/y', 2010, 7.0],
['model_a', 'scen_a', 'reg_b', 'Primary Energy', 'EJ/y', 2005, 2.0],
['model_a', 'scen_a', 'reg_b', 'Primary Energy', 'EJ/y', 2010, 3.0],

],
columns=LONG_IDX + ['value']
)

CO2_MAX_DF = pd.DataFrame([
['model_a', 'scen_a', 'World', 'Emissions|CO2', 'EJ/y', 2005, 6.0],
['model_a', 'scen_a', 'World', 'Emissions|CO2', 'EJ/y', 2010, 8.0],
['model_a', 'scen_a', 'reg_a', 'Emissions|CO2', 'EJ/y', 2005, 4.0],
['model_a', 'scen_a', 'reg_a', 'Emissions|CO2', 'EJ/y', 2010, 5.0],
['model_a', 'scen_a', 'reg_b', 'Emissions|CO2', 'EJ/y', 2005, 2.0],
['model_a', 'scen_a', 'reg_b', 'Emissions|CO2', 'EJ/y', 2010, 3.0],
],
columns=LONG_IDX + ['value']
)

REG_IDX = ['model', 'scenario', 'variable', 'unit', 'year']

PRICE_MAX_DF = pd.DataFrame([
['model_a', 'scen_a', 'Price|Carbon', 'USD/tCO2', 2005, 10.0],
['model_a', 'scen_a', 'Price|Carbon', 'USD/tCO2', 2010, 30.0],
],
columns=REG_IDX + ['value']
)


def test_aggregate(aggregate_df):
df = aggregate_df

Expand All @@ -21,18 +56,8 @@ def test_aggregate(aggregate_df):
assert _df.check_aggregate('Primary Energy', components=components) is None

# use other method (max) both as string and passing the function
idx = ['model', 'scenario', 'region', 'unit', 'year']
exp = pd.DataFrame([
['model_a', 'scen_a', 'World', 'EJ/y', 2005, 7.0],
['model_a', 'scen_a', 'World', 'EJ/y', 2010, 10.0],
['model_a', 'scen_a', 'reg_a', 'EJ/y', 2005, 5.0],
['model_a', 'scen_a', 'reg_a', 'EJ/y', 2010, 7.0],
['model_a', 'scen_a', 'reg_b', 'EJ/y', 2005, 2.0],
['model_a', 'scen_a', 'reg_b', 'EJ/y', 2010, 3.0],
exp = PE_MAX_DF.set_index(LONG_IDX).value

],
columns=idx + ['value']
).set_index(idx).value
obs = df.aggregate('Primary Energy', method='max')
pd.testing.assert_series_equal(obs, exp)

Expand All @@ -43,6 +68,31 @@ def test_aggregate(aggregate_df):
pytest.raises(ValueError, df.aggregate, 'Primary Energy', method='foo')


def test_aggregate_by_list(aggregate_df):
df = aggregate_df
var_list = ['Primary Energy', 'Emissions|CO2']

# primary energy and emissions are a direct sum (within each region)
assert df.check_aggregate(var_list) is None

# use other method (max) both as string and passing the function
exp = (
pd.concat([PE_MAX_DF, CO2_MAX_DF])
.set_index(LONG_IDX).value
.sort_index()
)

obs = df.aggregate(var_list, method='max')
pd.testing.assert_series_equal(obs, exp)

obs = df.aggregate(var_list, method=np.max)
pd.testing.assert_series_equal(obs, exp)

# using list of variables and components raises an error
components = ['Primary Energy|Coal', 'Primary Energy|Wind']
pytest.raises(ValueError, df.aggregate, var_list, components=components)


def test_aggregate_region(aggregate_df):
df = aggregate_df

Expand Down Expand Up @@ -72,13 +122,7 @@ def test_aggregate_region(aggregate_df):
weight='bar')

# use other method (max) both as string and passing the function
idx = ['model', 'scenario', 'unit', 'year']
exp = pd.DataFrame([
['model_a', 'scen_a', 'USD/tCO2', 2005, 10.0],
['model_a', 'scen_a', 'USD/tCO2', 2010, 30.0]
],
columns=idx + ['value']
).set_index(idx).value
exp = PRICE_MAX_DF.set_index(REG_IDX).value
obs = df.aggregate_region('Price|Carbon', method='max')
pd.testing.assert_series_equal(obs, exp)

Expand All @@ -93,6 +137,34 @@ def test_aggregate_region(aggregate_df):
weight='bar')


def test_aggregate_region_by_list(aggregate_df):
df = aggregate_df
var_list = ['Primary Energy', 'Primary Energy|Coal', 'Primary Energy|Wind']

# primary energy and sub-categories are a direct sum (across regions)
assert df.check_aggregate_region(var_list) is None

# emissions and carbon price are _not_ a direct sum (across regions)
var_list = ['Price|Carbon', 'Emissions|CO2']
assert df.check_aggregate_region(var_list) is not None

# using list of variables and components raises an error
pytest.raises(ValueError, df.aggregate_region, var_list, components=True)

# using list of variables and weight raises an error (inconsistent weight)
pytest.raises(ValueError, df.aggregate_region, var_list, weight=True)

# use other method (max) both as string and passing the function
_co2_df = CO2_MAX_DF[CO2_MAX_DF.region == 'World'].drop(columns='region')
exp = pd.concat([_co2_df, PRICE_MAX_DF]).set_index(REG_IDX).value

obs = df.aggregate_region(var_list, method='max')
pd.testing.assert_series_equal(obs, exp)

obs = df.aggregate_region(var_list, method=np.max)
pd.testing.assert_series_equal(obs, exp)


def test_missing_region(check_aggregate_df):
# for now, this test makes sure that this operation works as expected
exp = check_aggregate_df.aggregate_region('Primary Energy', region='foo')
Expand Down Expand Up @@ -391,8 +463,8 @@ def test_aggregate_region_components_handling(check_aggregate_regional_df,
res = tdf.aggregate_region("Emissions|N2O", components=components,
subregions=["REUROPE", "RASIA"])
exp_idx = pd.MultiIndex.from_product(
[["AIM"], ["cscen"], ["Mt N/yr"], [2005, 2010]],
names=["model", "scenario", "unit", "year"]
[["AIM"], ["cscen"], ['Emissions|N2O'], ["Mt N/yr"], [2005, 2010]],
names=["model", "scenario", "variable", "unit", "year"]
)
exp = pd.Series(exp_vals, index=exp_idx)
exp.name = "value"
Expand Down