Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add method and weight feature to (region) aggregation #305

Merged
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
10cf203
use full `TEST_DF` for unit tests
danielhuppmann Dec 13, 2019
85afde4
use full `TEST_DF` for unit tests specific for 'year' feature
danielhuppmann Dec 13, 2019
08b894a
replace `meta_df` by `test_df` across all tests
danielhuppmann Dec 13, 2019
4636f90
appease stickler
danielhuppmann Dec 14, 2019
c22dca0
merge relevant changes from `peterkolp:region_aggregation_mip_feature`
danielhuppmann Dec 14, 2019
7ef3516
docstring clean-up
danielhuppmann Dec 14, 2019
9969c74
set `compenents=False` as default in `[check_]aggregate_region()`
danielhuppmann Dec 14, 2019
6dc3228
fix `method` docstring, add `weights` kwarg
danielhuppmann Dec 14, 2019
978829b
move internal function `_all_other_regions()`
danielhuppmann Dec 14, 2019
bf5813c
update docstring
danielhuppmann Dec 14, 2019
02adebc
speed-up of `aggregate_region` (no cloning of IamDataFrame)
danielhuppmann Dec 14, 2019
4a5e359
fix a kwarg default, add docstrings
danielhuppmann Dec 14, 2019
fa152e2
speed up `aggregate_region()` even more
danielhuppmann Dec 14, 2019
4e7fdc2
add feature to do weighted average over regions
danielhuppmann Dec 14, 2019
92b292c
refactor kwarg and auxiliary function to `weight`
danielhuppmann Dec 14, 2019
08ce709
update docstrings (preparing for new test data)
danielhuppmann Dec 16, 2019
55f21c0
add unit test for `check_aggregate_region()`
danielhuppmann Dec 16, 2019
14de79f
add test for `method` kwarg in `check_aggregate_region()`
danielhuppmann Dec 16, 2019
a3901a1
raise if variable & weight of inconsistent index in `aggregate_region()`
danielhuppmann Dec 17, 2019
9688aa1
make full-agg-feature test data complete
danielhuppmann Dec 17, 2019
63a5d8a
add tests for `aggregate()`
danielhuppmann Dec 17, 2019
ba71cc7
appease stickler
danielhuppmann Dec 17, 2019
2d50535
appease stickler again
danielhuppmann Dec 17, 2019
c582003
third time stickler
danielhuppmann Dec 17, 2019
b727cee
merge from `master`
danielhuppmann Dec 23, 2019
ee630aa
add `mean` to KNOWN_FUNCS (review comment by @gidden)
danielhuppmann Dec 23, 2019
01637b7
add to release notes
danielhuppmann Dec 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/tutorials/checking_databases.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -4059,7 +4059,7 @@
"source": [
"for variable in consistent_df.filter(level=1).variables():\n",
" diff = consistent_df.check_aggregate_region(\n",
" variable, \n",
" variable, components=True,\n",
" **np_isclose_args\n",
" )\n",
" assert diff is None"
Expand Down
148 changes: 110 additions & 38 deletions pyam/core.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
YEAR_IDX,
IAMC_IDX,
SORT_IDX,
KNOWN_FUNCS
)
from pyam.read_ixmp import read_ix
from pyam.timeseries import fill_series
Expand Down Expand Up @@ -766,7 +767,7 @@ def normalize(self, inplace=False, **kwargs):
if not inplace:
return ret

def aggregate(self, variable, components=None, append=False):
def aggregate(self, variable, components=None, method='sum', append=False):
"""Compute the aggregate of timeseries components or sub-categories

Parameters
Expand All @@ -775,6 +776,8 @@ def aggregate(self, variable, components=None, append=False):
variable for which the aggregate should be computed
components: list of str, default None
list of variables, defaults to all sub-categories of `variable`
method: func or str, default 'sum'
method to use for aggregation, e.g. np.mean, np.sum, 'min', 'max'
append: bool, default False
append the aggregate timeseries to `data` and return None,
else return aggregate timeseries
Expand All @@ -789,15 +792,15 @@ def aggregate(self, variable, components=None, append=False):
return

rows = self._apply_filters(variable=components)
_data = _aggregate(self.data[rows], 'variable')
_data = _aggregate(self.data[rows], 'variable', method)

if append is True:
self.append(_data, variable=variable, inplace=True)
else:
return _data

def check_aggregate(self, variable, components=None, exclude_on_fail=False,
multiplier=1, **kwargs):
def check_aggregate(self, variable, components=None, method='sum',
exclude_on_fail=False, multiplier=1, **kwargs):
"""Check whether a timeseries matches the aggregation of its components

Parameters
Expand All @@ -806,6 +809,8 @@ def check_aggregate(self, variable, components=None, exclude_on_fail=False,
variable to be checked for matching aggregation of sub-categories
components: list of str, default None
list of variables, defaults to all sub-categories of `variable`
method: func or str, default 'sum'
method to use for aggregation, e.g. np.mean, np.sum, 'min', 'max'
exclude_on_fail: boolean, default False
flag scenarios failing validation as `exclude: True`
multiplier: number, default 1
Expand All @@ -820,7 +825,8 @@ def check_aggregate(self, variable, components=None, exclude_on_fail=False,
# filter and groupby data, use `pd.Series.align` for matching index
rows = self._apply_filters(variable=variable)
df_variable, df_components = (
_aggregate(self.data[rows], 'variable').align(df_components)
_aggregate(self.data[rows], 'variable', method)
.align(df_components)
)

# use `np.isclose` for checking match
Expand All @@ -837,7 +843,8 @@ def check_aggregate(self, variable, components=None, exclude_on_fail=False,
return IamDataFrame(diff, variable=variable).timeseries()

def aggregate_region(self, variable, region='World', subregions=None,
components=None, append=False):
components=False, method='sum', weight=None,
append=False):
"""Compute the aggregate of timeseries over a number of regions
including variable components only defined at the `region` level

Expand All @@ -849,18 +856,26 @@ def aggregate_region(self, variable, region='World', subregions=None,
dimension
subregions: list of str
list of subregions, defaults to all regions other than `region`
components: list of str
list of variables to include in the aggregate from the `region`
level, defaults to all sub-categories of `variable` included in
`region` but not in any of `subregions`
components: bool or list of str, default False
variables at the `region` level to be included in the aggregation
(ignored if False); if `True`, use all sub-categories of `variable`
included in `region` but not in any of the `subregions`;
or explicit list of variables
method: func or str, default 'sum'
method to use for aggregation, e.g. np.mean, np.sum, 'min', 'max'
weight: str, default None
variable to use as weight for the aggregation
(currently only supported with `method='sum'`)
append: bool, default False
append the aggregate timeseries to `data` and return None,
else return aggregate timeseries
"""
if weight is not None and components is not False:
msg = 'using weights and components in one operation not supported'
raise ValueError(msg)

# default subregions to all regions other than `region`
if subregions is None:
rows = self._apply_filters(variable=variable)
subregions = set(self.data[rows].region) - set([region])
subregions = subregions or self._all_other_regions(region, variable)

if not len(subregions):
msg = 'cannot aggregate variable `{}` to `{}` because it does not'\
Expand All @@ -872,30 +887,40 @@ def aggregate_region(self, variable, region='World', subregions=None,
# compute aggregate over all subregions
subregion_df = self.filter(region=subregions)
cols = ['region', 'variable']
_data = _aggregate(subregion_df.filter(variable=variable).data, cols)

# add components at the `region` level, defaults to all variables one
# level below `variable` that are only present in `region`
with adjust_log_level(logger):
region_df = self.filter(region=region)

rdf_comps = region_df._variable_components(variable, level=None)
srdf_comps = subregion_df._variable_components(variable, level=None)
components = components or set(rdf_comps).difference(srdf_comps)

if len(components):
rows = region_df._apply_filters(variable=components)
_data = _data.add(_aggregate(region_df.data[rows], cols),
fill_value=0)
rows = subregion_df._apply_filters(variable=variable)
if weight is None:
_data = _aggregate(subregion_df.data[rows], cols, method=method)
else:
weight_rows = subregion_df._apply_filters(variable=weight)
_data = _aggregate_weight(subregion_df.data[rows],
subregion_df.data[weight_rows], method)

# if not `components=False`, add components at the `region` level
if components is not False:
with adjust_log_level(logger):
region_df = self.filter(region=region)

# if `True`, auto-detect `components` at the `region` level,
# defaults to variables below `variable` only present in `region`
if components is True:
level = dict(level=None)
r_comps = region_df._variable_components(variable, **level)
sr_comps = subregion_df._variable_components(variable, **level)
components = set(r_comps).difference(sr_comps)

if len(components):
rows = region_df._apply_filters(variable=components)
_data = _data.add(_aggregate(region_df.data[rows], cols),
fill_value=0)

if append is True:
self.append(_data, region=region, variable=variable, inplace=True)
else:
return _data

def check_aggregate_region(self, variable, region='World', subregions=None,
components=None, exclude_on_fail=False,
**kwargs):
components=False, method='sum', weight=None,
exclude_on_fail=False, **kwargs):
"""Check whether the region timeseries data match the aggregation
of components

Expand All @@ -907,16 +932,23 @@ def check_aggregate_region(self, variable, region='World', subregions=None,
region to be checked for matching aggregation of subregions
subregions: list of str
list of subregions, defaults to all regions other than `region`
components: list of str, default None
list of variables, defaults to all sub-categories of `variable`
included in `region` but not in any of `subregions`
components: bool or list of str, default False
variables at the `region` level to be included in the aggregation
(ignored if False); if `True`, use all sub-categories of `variable`
included in `region` but not in any of the `subregions`;
or explicit list of variables
method: func or str, default 'sum'
method to use for aggregation, e.g. np.mean, np.sum, 'min', 'max'
weight: str, default None
variable to use as weight for the aggregation
(currently only supported with `method='sum'`)
exclude_on_fail: boolean, default False
flag scenarios failing validation as `exclude: True`
kwargs: passed to `np.isclose()`
"""
# compute aggregate from subregions, return None if no subregions
df_subregions = self.aggregate_region(variable, region, subregions,
components)
components, method, weight)
if df_subregions is None:
return

Expand Down Expand Up @@ -947,6 +979,11 @@ def check_aggregate_region(self, variable, region='World', subregions=None,
col_args = dict(region=region, variable=variable)
return IamDataFrame(diff, **col_args).timeseries()

def _all_other_regions(self, region, variable):
"""Return list of regions other than `region` containing `variable`"""
rows = self._apply_filters(variable=variable)
return set(self.data[rows].region) - set([region])

def _variable_components(self, variable, level=0):
"""Get all components (sub-categories) of a variable for a given level

Expand All @@ -958,7 +995,7 @@ def _variable_components(self, variable, level=0):
level=level)]

def check_internal_consistency(self, **kwargs):
"""Check whether the database is internally consistent
"""Check whether a scenario ensemble is internally consistent

We check that all variables are equal to the sum of their sectoral
components and that all the regions add up to the World total. If
Expand All @@ -981,7 +1018,8 @@ def check_internal_consistency(self, **kwargs):
if diff_agg is not None:
inconsistent_vars[variable + "-aggregate"] = diff_agg

diff_regional = self.check_aggregate_region(variable, **kwargs)
diff_regional = self.check_aggregate_region(variable,
components=True, **kwargs)
danielhuppmann marked this conversation as resolved.
Show resolved Hide resolved
if diff_regional is not None:
inconsistent_vars[variable + "-regional"] = diff_regional

Expand Down Expand Up @@ -1444,11 +1482,45 @@ def _meta_idx(data):
return data[META_IDX].drop_duplicates().set_index(META_IDX).index


def _aggregate(df, by):
def _aggregate(df, by, method=np.sum):
"""Aggregate `df` by specified column(s), return indexed `pd.Series`"""
by = [by] if isstr(by) else by
cols = [c for c in list(df.columns) if c not in ['value'] + by]
return df.groupby(cols).sum()['value']
# pick aggregator func (default: sum)
return df.groupby(cols)['value'].agg(_get_method_func(method))


def _aggregate_weight(df, weight, method):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is for weighted average, then surely method is always sum/np.sum? or is this just meant as a placeholder for future methods?

Consider also instead using np.nansum such that if a value is missing / nan, the calculation won't return nan

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The method arg is partly anticipating more supported functions in the future, partly for checking that the chosen method is indeed sum in that function (moving some not-so important stuff to lower-level functions, also for reusing _aggregate_weight() in other functions in the future and not needing to implement that check in multiple higher-level functions.

  2. The data table cannot have nan (they are removed at initialisation), so this point is moot. If a data value or the weight value is missing (i.e., inconsistent series index's), a ValueError is raised.

"""Aggregate `df` by regions with weights, return indexed `pd.Series`"""
# only summation allowed with weights
if method not in ['sum', np.sum]:
raise ValueError('only method `np.sum` allowed for weighted average')

_data = _get_value_col(df, YEAR_IDX)
_weight = _get_value_col(weight, YEAR_IDX)

if not _data.index.equals(_weight.index):
raise ValueError('inconsistent index between variable and weight')

cols = META_IDX + ['year']
return (_data * _weight).groupby(cols).sum() / _weight.groupby(cols).sum()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you put .agg(method) here instead of .sum()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you also want to limit inputs to the KNOWN_FUNCS ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only summation is allowed (for the time being) anyway, so imho no need to be more complicated...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that we don't need anything more complicated here, but maybe worth putting in some comments about what would need to be changed in the future if more than sum is supported?



def _get_method_func(method):
"""Translate a string to a known method"""
if not isstr(method):
return method

if method in KNOWN_FUNCS:
return KNOWN_FUNCS[method]

# raise error if `method` is a string but not in dict of known methods
raise ValueError('method `{}` is not a known aggregator'.format(method))


def _get_value_col(df, cols):
"""Return the value column as `pd.Series sorted by index"""
return df.set_index(cols)['value'].sort_index()


def _raise_filter_error(col):
Expand Down
2 changes: 2 additions & 0 deletions pyam/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
+ ['{}{}'.format(i, j) for i, j in itertools.product(
string.ascii_uppercase, string.ascii_uppercase)]))

KNOWN_FUNCS = {'min': np.min, 'max': np.max, 'avg': np.mean, 'sum': np.sum}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a 'mean' synonym as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done



def requires_package(pkg, msg, error_type=ImportError):
"""Decorator when a function requires an optional dependency
Expand Down
59 changes: 42 additions & 17 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,34 @@
)


FULL_FEATURE_DF = pd.DataFrame([
['World', 'Primary Energy', 'EJ/y', 10, 15],
['reg_a', 'Primary Energy', 'EJ/y', 6, 9],
['reg_b', 'Primary Energy', 'EJ/y', 4, 6],
['World', 'Primary Energy|Coal', 'EJ/y', 7, 10],
['reg_a', 'Primary Energy|Coal', 'EJ/y', 5, 7],
['reg_b', 'Primary Energy|Coal', 'EJ/y', 2, 3],
['World', 'Primary Energy|Wind', 'EJ/y', 3, 5],
['reg_a', 'Primary Energy|Wind', 'EJ/y', 1, 2],
['reg_b', 'Primary Energy|Wind', 'EJ/y', 2, 3],
['World', 'Emissions|CO2', 'EJ/y', 10, 14],
['World', 'Emissions|CO2|Energy', 'EJ/y', 6, 8],
['World', 'Emissions|CO2|AFOLU', 'EJ/y', 3, 4],
['World', 'Emissions|CO2|Bunkers', 'EJ/y', 1, 2],
['reg_a', 'Emissions|CO2', 'EJ/y', 6, 8],
['reg_a', 'Emissions|CO2|Energy', 'EJ/y', 4, 5],
['reg_a', 'Emissions|CO2|AFOLU', 'EJ/y', 2, 3],
['reg_b', 'Emissions|CO2', 'EJ/y', 3, 4],
['reg_b', 'Emissions|CO2|Energy', 'EJ/y', 2, 3],
['reg_b', 'Emissions|CO2|AFOLU', 'EJ/y', 1, 1],
['World', 'Price|Carbon', 'USD/tCO2', 4, 27],
['reg_a', 'Price|Carbon', 'USD/tCO2', 1, 30],
['reg_b', 'Price|Carbon', 'USD/tCO2', 10, 21],
],
columns=['region', 'variable', 'unit', 2005, 2010],
)


REG_DF = pd.DataFrame([
['IMAGE', 'a_scenario', 'NAF', 'Primary Energy', 'EJ/y', 1, 6],
['IMAGE', 'a_scenario', 'ME', 'Primary Energy', 'EJ/y', 2, 7],
Expand Down Expand Up @@ -177,47 +205,44 @@

TEST_YEARS = [2005, 2010]
TEST_DTS = [datetime(2005, 6, 17), datetime(2010, 7, 21)]
TEST_TIME_STR = ['2005-06-17', '2010-07-21']
TEST_TIME_STR_HR = ['2005-06-17 00:00:00', '2010-07-21 12:00:00']


# minimal IamDataFrame with four different time formats
@pytest.fixture(
scope="function",
params=[
TEST_YEARS,
TEST_DTS,
['2005-06-17', '2010-07-21'],
['2005-06-17 00:00:00', '2010-07-21 12:00:00']
TEST_TIME_STR,
TEST_TIME_STR_HR
]
)
def test_df(request):
tdf = TEST_DF.iloc[:2]
tdf = tdf.rename({2005: request.param[0], 2010: request.param[1]},
axis="columns")
tdf = TEST_DF.rename({2005: request.param[0], 2010: request.param[1]},
axis="columns")
df = IamDataFrame(data=tdf)
yield df


# minimal IamDataFrame for specifically testing 'year'-column features
@pytest.fixture(scope="function")
def test_df_year():
df = IamDataFrame(data=TEST_DF.iloc[:2])
df = IamDataFrame(data=TEST_DF)
yield df


# minimal test data provided as pandas.DataFrame (only 'year' time format)
@pytest.fixture(scope="function")
def test_pd_df():
yield TEST_DF.copy()


@pytest.fixture(
scope="function",
params=[
TEST_YEARS,
TEST_DTS,
]
)
def meta_df(request):
mdf = TEST_DF.rename({2005: request.param[0], 2010: request.param[1]},
axis="columns")
df = IamDataFrame(data=mdf)
# IamDataFrame with variable-and-region-structure for testing aggregation tools
@pytest.fixture(scope="function")
def aggregate_df():
df = IamDataFrame(model='model_a', scenario='scen_a', data=FULL_FEATURE_DF)
yield df


Expand Down
Loading