diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 9e6f437a9..4cda6f27d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,14 @@ - # Next Release +## API changes + +PR [#305](https://github.com/IAMconsortium/pyam/pull/305) changed the default +behaviour of `aggregate_region()` regarding the treatment of components at the +region-level. To keep the previous behaviour, add `components=True`. + +## Individual Updates + +- [#305](https://github.com/IAMconsortium/pyam/pull/305) Add `method` and `weight` options to the (region) aggregation functions - [#302](https://github.com/IAMconsortium/pyam/pull/302) Rework the tutorials - [#301](https://github.com/IAMconsortium/pyam/pull/301) Bugfix when using `to_excel()` with a `pd.ExcelWriter` - [#297](https://github.com/IAMconsortium/pyam/pull/297) Add `empty` attribute, better error for `timeseries()` on empty dataframe @@ -8,6 +16,7 @@ - [#292](https://github.com/IAMconsortium/pyam/pull/292) Add warning message if `data` is empty at initialization (after formatting) - [#288](https://github.com/IAMconsortium/pyam/pull/288) Put `pyam` logger in its own namespace (see [here](https://docs.python-guide.org/writing/logging/#logging-in-a-library>)) - [#285](https://github.com/IAMconsortium/pyam/pull/285) Add ability to fetch regions with synonyms from IXMP API + # Release v0.3.0 ## Highlights diff --git a/pyam/core.py b/pyam/core.py old mode 100644 new mode 100755 index 4da8c568b..88a70057f --- a/pyam/core.py +++ b/pyam/core.py @@ -38,6 +38,7 @@ YEAR_IDX, IAMC_IDX, SORT_IDX, + KNOWN_FUNCS ) from pyam.read_ixmp import read_ix from pyam.timeseries import fill_series @@ -766,7 +767,7 @@ def normalize(self, inplace=False, **kwargs): if not inplace: return ret - def aggregate(self, variable, components=None, append=False): + def aggregate(self, variable, components=None, method='sum', append=False): """Compute the aggregate of timeseries components or sub-categories Parameters @@ -775,6 +776,8 @@ def aggregate(self, variable, components=None, append=False): variable for which the aggregate should be computed components: list of str, default None list of variables, defaults to all sub-categories of `variable` + method: func or str, default 'sum' + method to use for aggregation, e.g. np.mean, np.sum, 'min', 'max' append: bool, default False append the aggregate timeseries to `data` and return None, else return aggregate timeseries @@ -789,15 +792,15 @@ def aggregate(self, variable, components=None, append=False): return rows = self._apply_filters(variable=components) - _data = _aggregate(self.data[rows], 'variable') + _data = _aggregate(self.data[rows], 'variable', method) if append is True: self.append(_data, variable=variable, inplace=True) else: return _data - def check_aggregate(self, variable, components=None, exclude_on_fail=False, - multiplier=1, **kwargs): + def check_aggregate(self, variable, components=None, method='sum', + exclude_on_fail=False, multiplier=1, **kwargs): """Check whether a timeseries matches the aggregation of its components Parameters @@ -806,6 +809,8 @@ def check_aggregate(self, variable, components=None, exclude_on_fail=False, variable to be checked for matching aggregation of sub-categories components: list of str, default None list of variables, defaults to all sub-categories of `variable` + method: func or str, default 'sum' + method to use for aggregation, e.g. np.mean, np.sum, 'min', 'max' exclude_on_fail: boolean, default False flag scenarios failing validation as `exclude: True` multiplier: number, default 1 @@ -820,7 +825,8 @@ def check_aggregate(self, variable, components=None, exclude_on_fail=False, # filter and groupby data, use `pd.Series.align` for matching index rows = self._apply_filters(variable=variable) df_variable, df_components = ( - _aggregate(self.data[rows], 'variable').align(df_components) + _aggregate(self.data[rows], 'variable', method) + .align(df_components) ) # use `np.isclose` for checking match @@ -837,7 +843,8 @@ def check_aggregate(self, variable, components=None, exclude_on_fail=False, return IamDataFrame(diff, variable=variable).timeseries() def aggregate_region(self, variable, region='World', subregions=None, - components=None, append=False): + components=False, method='sum', weight=None, + append=False): """Compute the aggregate of timeseries over a number of regions including variable components only defined at the `region` level @@ -849,18 +856,26 @@ def aggregate_region(self, variable, region='World', subregions=None, dimension subregions: list of str list of subregions, defaults to all regions other than `region` - components: list of str - list of variables to include in the aggregate from the `region` - level, defaults to all sub-categories of `variable` included in - `region` but not in any of `subregions` + components: bool or list of str, default False + variables at the `region` level to be included in the aggregation + (ignored if False); if `True`, use all sub-categories of `variable` + included in `region` but not in any of the `subregions`; + or explicit list of variables + method: func or str, default 'sum' + method to use for aggregation, e.g. np.mean, np.sum, 'min', 'max' + weight: str, default None + variable to use as weight for the aggregation + (currently only supported with `method='sum'`) append: bool, default False append the aggregate timeseries to `data` and return None, else return aggregate timeseries """ + if weight is not None and components is not False: + msg = 'using weights and components in one operation not supported' + raise ValueError(msg) + # default subregions to all regions other than `region` - if subregions is None: - rows = self._apply_filters(variable=variable) - subregions = set(self.data[rows].region) - set([region]) + subregions = subregions or self._all_other_regions(region, variable) if not len(subregions): msg = 'cannot aggregate variable `{}` to `{}` because it does not'\ @@ -872,21 +887,31 @@ def aggregate_region(self, variable, region='World', subregions=None, # compute aggregate over all subregions subregion_df = self.filter(region=subregions) cols = ['region', 'variable'] - _data = _aggregate(subregion_df.filter(variable=variable).data, cols) - - # add components at the `region` level, defaults to all variables one - # level below `variable` that are only present in `region` - with adjust_log_level(logger): - region_df = self.filter(region=region) - - rdf_comps = region_df._variable_components(variable, level=None) - srdf_comps = subregion_df._variable_components(variable, level=None) - components = components or set(rdf_comps).difference(srdf_comps) - - if len(components): - rows = region_df._apply_filters(variable=components) - _data = _data.add(_aggregate(region_df.data[rows], cols), - fill_value=0) + rows = subregion_df._apply_filters(variable=variable) + if weight is None: + _data = _aggregate(subregion_df.data[rows], cols, method=method) + else: + weight_rows = subregion_df._apply_filters(variable=weight) + _data = _aggregate_weight(subregion_df.data[rows], + subregion_df.data[weight_rows], method) + + # if not `components=False`, add components at the `region` level + if components is not False: + with adjust_log_level(logger): + region_df = self.filter(region=region) + + # if `True`, auto-detect `components` at the `region` level, + # defaults to variables below `variable` only present in `region` + if components is True: + level = dict(level=None) + r_comps = region_df._variable_components(variable, **level) + sr_comps = subregion_df._variable_components(variable, **level) + components = set(r_comps).difference(sr_comps) + + if len(components): + rows = region_df._apply_filters(variable=components) + _data = _data.add(_aggregate(region_df.data[rows], cols), + fill_value=0) if append is True: self.append(_data, region=region, variable=variable, inplace=True) @@ -894,8 +919,8 @@ def aggregate_region(self, variable, region='World', subregions=None, return _data def check_aggregate_region(self, variable, region='World', subregions=None, - components=None, exclude_on_fail=False, - **kwargs): + components=False, method='sum', weight=None, + exclude_on_fail=False, **kwargs): """Check whether the region timeseries data match the aggregation of components @@ -907,16 +932,23 @@ def check_aggregate_region(self, variable, region='World', subregions=None, region to be checked for matching aggregation of subregions subregions: list of str list of subregions, defaults to all regions other than `region` - components: list of str, default None - list of variables, defaults to all sub-categories of `variable` - included in `region` but not in any of `subregions` + components: bool or list of str, default False + variables at the `region` level to be included in the aggregation + (ignored if False); if `True`, use all sub-categories of `variable` + included in `region` but not in any of the `subregions`; + or explicit list of variables + method: func or str, default 'sum' + method to use for aggregation, e.g. np.mean, np.sum, 'min', 'max' + weight: str, default None + variable to use as weight for the aggregation + (currently only supported with `method='sum'`) exclude_on_fail: boolean, default False flag scenarios failing validation as `exclude: True` kwargs: passed to `np.isclose()` """ # compute aggregate from subregions, return None if no subregions df_subregions = self.aggregate_region(variable, region, subregions, - components) + components, method, weight) if df_subregions is None: return @@ -947,6 +979,11 @@ def check_aggregate_region(self, variable, region='World', subregions=None, col_args = dict(region=region, variable=variable) return IamDataFrame(diff, **col_args).timeseries() + def _all_other_regions(self, region, variable): + """Return list of regions other than `region` containing `variable`""" + rows = self._apply_filters(variable=variable) + return set(self.data[rows].region) - set([region]) + def _variable_components(self, variable, level=0): """Get all components (sub-categories) of a variable for a given level @@ -958,7 +995,7 @@ def _variable_components(self, variable, level=0): level=level)] def check_internal_consistency(self, **kwargs): - """Check whether the database is internally consistent + """Check whether a scenario ensemble is internally consistent We check that all variables are equal to the sum of their sectoral components and that all the regions add up to the World total. If @@ -981,7 +1018,10 @@ def check_internal_consistency(self, **kwargs): if diff_agg is not None: inconsistent_vars[variable + "-aggregate"] = diff_agg - diff_regional = self.check_aggregate_region(variable, **kwargs) + diff_regional = ( + self.check_aggregate_region(variable, components=True, + **kwargs) + ) if diff_regional is not None: inconsistent_vars[variable + "-regional"] = diff_regional @@ -1444,11 +1484,45 @@ def _meta_idx(data): return data[META_IDX].drop_duplicates().set_index(META_IDX).index -def _aggregate(df, by): +def _aggregate(df, by, method=np.sum): """Aggregate `df` by specified column(s), return indexed `pd.Series`""" by = [by] if isstr(by) else by cols = [c for c in list(df.columns) if c not in ['value'] + by] - return df.groupby(cols).sum()['value'] + # pick aggregator func (default: sum) + return df.groupby(cols)['value'].agg(_get_method_func(method)) + + +def _aggregate_weight(df, weight, method): + """Aggregate `df` by regions with weights, return indexed `pd.Series`""" + # only summation allowed with weights + if method not in ['sum', np.sum]: + raise ValueError('only method `np.sum` allowed for weighted average') + + _data = _get_value_col(df, YEAR_IDX) + _weight = _get_value_col(weight, YEAR_IDX) + + if not _data.index.equals(_weight.index): + raise ValueError('inconsistent index between variable and weight') + + cols = META_IDX + ['year'] + return (_data * _weight).groupby(cols).sum() / _weight.groupby(cols).sum() + + +def _get_method_func(method): + """Translate a string to a known method""" + if not isstr(method): + return method + + if method in KNOWN_FUNCS: + return KNOWN_FUNCS[method] + + # raise error if `method` is a string but not in dict of known methods + raise ValueError('method `{}` is not a known aggregator'.format(method)) + + +def _get_value_col(df, cols): + """Return the value column as `pd.Series sorted by index""" + return df.set_index(cols)['value'].sort_index() def _raise_filter_error(col): diff --git a/pyam/utils.py b/pyam/utils.py index 4b3ad9f31..c0445ea0d 100644 --- a/pyam/utils.py +++ b/pyam/utils.py @@ -33,6 +33,9 @@ + ['{}{}'.format(i, j) for i, j in itertools.product( string.ascii_uppercase, string.ascii_uppercase)])) +KNOWN_FUNCS = {'min': np.min, 'max': np.max, 'avg': np.mean, 'mean': np.mean, + 'sum': np.sum} + def requires_package(pkg, msg, error_type=ImportError): """Decorator when a function requires an optional dependency diff --git a/tests/conftest.py b/tests/conftest.py index 42e61c0a3..9b11121ce 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,6 +25,34 @@ ) +FULL_FEATURE_DF = pd.DataFrame([ + ['World', 'Primary Energy', 'EJ/y', 10, 15], + ['reg_a', 'Primary Energy', 'EJ/y', 6, 9], + ['reg_b', 'Primary Energy', 'EJ/y', 4, 6], + ['World', 'Primary Energy|Coal', 'EJ/y', 7, 10], + ['reg_a', 'Primary Energy|Coal', 'EJ/y', 5, 7], + ['reg_b', 'Primary Energy|Coal', 'EJ/y', 2, 3], + ['World', 'Primary Energy|Wind', 'EJ/y', 3, 5], + ['reg_a', 'Primary Energy|Wind', 'EJ/y', 1, 2], + ['reg_b', 'Primary Energy|Wind', 'EJ/y', 2, 3], + ['World', 'Emissions|CO2', 'EJ/y', 10, 14], + ['World', 'Emissions|CO2|Energy', 'EJ/y', 6, 8], + ['World', 'Emissions|CO2|AFOLU', 'EJ/y', 3, 4], + ['World', 'Emissions|CO2|Bunkers', 'EJ/y', 1, 2], + ['reg_a', 'Emissions|CO2', 'EJ/y', 6, 8], + ['reg_a', 'Emissions|CO2|Energy', 'EJ/y', 4, 5], + ['reg_a', 'Emissions|CO2|AFOLU', 'EJ/y', 2, 3], + ['reg_b', 'Emissions|CO2', 'EJ/y', 3, 4], + ['reg_b', 'Emissions|CO2|Energy', 'EJ/y', 2, 3], + ['reg_b', 'Emissions|CO2|AFOLU', 'EJ/y', 1, 1], + ['World', 'Price|Carbon', 'USD/tCO2', 4, 27], + ['reg_a', 'Price|Carbon', 'USD/tCO2', 1, 30], + ['reg_b', 'Price|Carbon', 'USD/tCO2', 10, 21], +], + columns=['region', 'variable', 'unit', 2005, 2010], +) + + REG_DF = pd.DataFrame([ ['IMAGE', 'a_scenario', 'NAF', 'Primary Energy', 'EJ/y', 1, 6], ['IMAGE', 'a_scenario', 'ME', 'Primary Energy', 'EJ/y', 2, 7], @@ -181,7 +209,7 @@ TEST_TIME_STR_HR = ['2005-06-17 00:00:00', '2010-07-21 12:00:00'] -# IamDataFrame with four different time formats +# minimal IamDataFrame with four different time formats @pytest.fixture( scope="function", params=[ @@ -198,19 +226,26 @@ def test_df(request): yield df -# IamDataFrame for testing specifically for 'year'-column feature +# minimal IamDataFrame for specifically testing 'year'-column features @pytest.fixture(scope="function") def test_df_year(): df = IamDataFrame(data=TEST_DF) yield df -# standard test data as pandas.DataFrame (only 'year' time format) +# minimal test data provided as pandas.DataFrame (only 'year' time format) @pytest.fixture(scope="function") def test_pd_df(): yield TEST_DF.copy() +# IamDataFrame with variable-and-region-structure for testing aggregation tools +@pytest.fixture(scope="function") +def aggregate_df(): + df = IamDataFrame(model='model_a', scenario='scen_a', data=FULL_FEATURE_DF) + yield df + + @pytest.fixture(scope="function") def check_aggregate_df(): df = IamDataFrame(data=CHECK_AGG_DF) diff --git a/tests/test_feature_aggregate.py b/tests/test_feature_aggregate.py index 59d90771c..c894ef65d 100644 --- a/tests/test_feature_aggregate.py +++ b/tests/test_feature_aggregate.py @@ -8,6 +8,91 @@ from conftest import TEST_DTS +def test_aggregate(aggregate_df): + df = aggregate_df + + # primary energy is a direct sum (within each region) + assert df.check_aggregate('Primary Energy') is None + + # rename sub-category to test setting components as list + _df = df.rename(variable={'Primary Energy|Wind': 'foo'}) + assert _df.check_aggregate('Primary Energy') is not None + components = ['Primary Energy|Coal', 'foo'] + assert _df.check_aggregate('Primary Energy', components=components) is None + + # use other method (max) both as string and passing the function + idx = ['model', 'scenario', 'region', 'unit', 'year'] + exp = pd.DataFrame([ + ['model_a', 'scen_a', 'World', 'EJ/y', 2005, 7.0], + ['model_a', 'scen_a', 'World', 'EJ/y', 2010, 10.0], + ['model_a', 'scen_a', 'reg_a', 'EJ/y', 2005, 5.0], + ['model_a', 'scen_a', 'reg_a', 'EJ/y', 2010, 7.0], + ['model_a', 'scen_a', 'reg_b', 'EJ/y', 2005, 2.0], + ['model_a', 'scen_a', 'reg_b', 'EJ/y', 2010, 3.0], + + ], + columns=idx + ['value'] + ).set_index(idx).value + obs = df.aggregate('Primary Energy', method='max') + pd.testing.assert_series_equal(obs, exp) + + obs = df.aggregate('Primary Energy', method=np.max) + pd.testing.assert_series_equal(obs, exp) + + # using illegal method raises an error + pytest.raises(ValueError, df.aggregate, 'Primary Energy', method='foo') + + +def test_aggregate_region(aggregate_df): + df = aggregate_df + + # primary energy is a direct sum (across regions) + assert df.check_aggregate_region('Primary Energy') is None + + # CO2 emissions have "bunkers" only defined at the region level + v = 'Emissions|CO2' + assert df.check_aggregate_region(v) is not None + assert df.check_aggregate_region(v, components=True) is None + + # rename emissions of bunker to test setting components as list + _df = df.rename(variable={'Emissions|CO2|Bunkers': 'foo'}) + assert _df.check_aggregate_region(v, components=['foo']) is None + + # carbon price shouldn't be summed but be weighted by emissions + assert df.check_aggregate_region('Price|Carbon') is not None + assert df.check_aggregate_region('Price|Carbon', weight=v) is None + + # inconsistent index of variable and weight raises an error + _df = df.filter(variable='Emissions|CO2', region='reg_b', keep=False) + pytest.raises(ValueError, _df.aggregate_region, 'Price|Carbon', + weight='Emissions|CO2') + + # setting both weight and components raises an error + pytest.raises(ValueError, df.aggregate_region, v, components=True, + weight='bar') + + # use other method (max) both as string and passing the function + idx = ['model', 'scenario', 'unit', 'year'] + exp = pd.DataFrame([ + ['model_a', 'scen_a', 'USD/tCO2', 2005, 10.0], + ['model_a', 'scen_a', 'USD/tCO2', 2010, 30.0] + ], + columns=idx + ['value'] + ).set_index(idx).value + obs = df.aggregate_region('Price|Carbon', method='max') + pd.testing.assert_series_equal(obs, exp) + + obs = df.aggregate_region('Price|Carbon', method=np.max) + pd.testing.assert_series_equal(obs, exp) + + # using illegal method raises an error + pytest.raises(ValueError, df.aggregate_region, v, method='foo') + + # using weight and method other than 'sum' raises an error + pytest.raises(ValueError, df.aggregate_region, v, method='max', + weight='bar') + + def test_missing_region(check_aggregate_df): # for now, this test makes sure that this operation works as expected exp = check_aggregate_df.aggregate_region('Primary Energy', region='foo') @@ -76,6 +161,11 @@ def test_do_aggregate_append(test_df): pd.testing.assert_frame_equal(df.timeseries(), exp) +def test_aggregate_unknown_method(reg_df): + pytest.raises(ValueError, reg_df.aggregate_region, 'Primary Energy', + method='foo') + + def test_check_aggregate_pass(check_aggregate_df): obs = check_aggregate_df.filter( scenario='a_scen' @@ -123,11 +213,12 @@ def test_df_check_aggregate_pass(check_aggregate_df): def test_df_check_aggregate_region_pass(check_aggregate_df): - obs = check_aggregate_df.check_aggregate_region('Primary Energy') + comp = dict(components=True) + obs = check_aggregate_df.check_aggregate_region('Primary Energy', **comp) assert obs is None for variable in check_aggregate_df.variables(): - obs = check_aggregate_df.check_aggregate_region(variable) + obs = check_aggregate_df.check_aggregate_region(variable, **comp) assert obs is None @@ -163,7 +254,7 @@ def run_check_agg_fail(pyam_df, tweak_dict, test_type): ) elif 'region' in test_type: obs = pyam_df.check_aggregate_region( - variable, + variable, components=True ) if obs is not None: @@ -254,34 +345,39 @@ def test_df_check_aggregate_region_errors(check_aggregate_regional_df): def test_df_check_aggregate_region_components(check_aggregate_regional_df): obs = check_aggregate_regional_df.check_aggregate_region( - 'Emissions|N2O', 'World', subregions=['REUROPE', 'RASIA'] + 'Emissions|N2O', 'World', subregions=['REUROPE', 'RASIA'], + components=True ) assert obs is None obs = check_aggregate_regional_df.check_aggregate_region( - 'Emissions|N2O|Ind|Solvents', 'World', subregions=['REUROPE', 'RASIA'] + 'Emissions|N2O|Ind|Solvents', 'World', subregions=['REUROPE', 'RASIA'], + components=True ) assert obs is None obs = check_aggregate_regional_df.check_aggregate_region( - 'Emissions|N2O', 'REUROPE', subregions=['Germany', 'UK'] + 'Emissions|N2O', 'REUROPE', subregions=['Germany', 'UK'], + components=True ) assert obs is None obs = check_aggregate_regional_df.check_aggregate_region( - 'Emissions|N2O', 'RASIA', subregions=['China', 'Japan'] + 'Emissions|N2O', 'RASIA', subregions=['China', 'Japan'], + components=True ) assert obs is None obs = check_aggregate_regional_df.check_aggregate_region( - 'Emissions|N2O|Ind|Transport', 'REUROPE', subregions=['Germany', 'UK'] + 'Emissions|N2O|Ind|Transport', 'REUROPE', subregions=['Germany', 'UK'], + components=True ) assert obs is None @pytest.mark.parametrize("components,exp_vals", ( # should find sub-components including nested bunkers - (None, [1.9, 15.7]), + (True, [1.9, 15.7]), # should only add AFOLU onto regional sum, not Shipping emissions (["Emissions|N2O|AFOLU"], [0.9, 9.7]), # specifying Ind leads to double counting (and not skipping AFOLU) but as