diff --git a/examples/0xx_parallel_computation.py b/examples/0xx_parallel_computation.py new file mode 100644 index 000000000..98a74e3f4 --- /dev/null +++ b/examples/0xx_parallel_computation.py @@ -0,0 +1,117 @@ +"""Example: Compare parallel interfaces +""" + +from time import perf_counter as timerpc + +import numpy as np + +from floris import ( + FlorisModel, + TimeSeries, + WindRose, +) +from floris.parallel_floris_model import ParallelFlorisModel as ParallelFlorisModel_orig +from floris.parallel_floris_model_2 import ParallelFlorisModel as ParallelFlorisModel_new + + +if __name__ == "__main__": + # Parallelization parameters + parallel_interface = "multiprocessing" + max_workers = 16 + + # Load the wind rose from csv + wind_rose = WindRose.read_csv_long( + "inputs/wind_rose.csv", wd_col="wd", ws_col="ws", freq_col="freq_val", + ti_col_or_value=0.06 + ) + fmodel = FlorisModel("inputs/gch.yaml") + + # Specify wind farm layout and update in the floris object + N = 12 # number of turbines per row and per column + X, Y = np.meshgrid( + 5.0 * fmodel.core.farm.rotor_diameters_sorted[0][0] * np.arange(0, N, 1), + 5.0 * fmodel.core.farm.rotor_diameters_sorted[0][0] * np.arange(0, N, 1), + ) + fmodel.set(layout_x=X.flatten(), layout_y=Y.flatten()) + + # Set up original parallel Floris model + pfmodel_orig = ParallelFlorisModel_orig( + fmodel=fmodel, + max_workers=max_workers, + n_wind_condition_splits=max_workers, + interface=parallel_interface, + print_timings=True + ) + + + # Set up new parallel Floris model + pfmodel_new = ParallelFlorisModel_new( + "inputs/gch.yaml", + max_workers=max_workers, + n_wind_condition_splits=max_workers, + interface=parallel_interface, + print_timings=True, + ) + + # Set up new parallel Floris model using only powers + pfmodel_new_p = ParallelFlorisModel_new( + "inputs/gch.yaml", + max_workers=max_workers, + n_wind_condition_splits=max_workers, + interface=parallel_interface, + return_turbine_powers_only=True, + print_timings=True, + ) + + # Set layout, wind data on all models + fmodel.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=wind_rose) + pfmodel_orig.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=wind_rose) + pfmodel_new.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=wind_rose) + pfmodel_new_p.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=wind_rose) + + # Run and evaluate farm over the wind rose + t0 = timerpc() + fmodel.run() + aep_fmodel = fmodel.get_farm_AEP() + t_fmodel = timerpc() - t0 + + t0 = timerpc() + #pfmodel_orig.run() + aep_pfmodel_orig = pfmodel_orig.get_farm_AEP(freq=wind_rose.unpack_freq()) + t_pfmodel_orig = timerpc() - t0 + + t0 = timerpc() + pfmodel_new.run() + aep_pfmodel_new = pfmodel_new.get_farm_AEP() + t_pfmodel_new = timerpc() - t0 + + t0 = timerpc() + pfmodel_new.run() + aep_pfmodel_new = pfmodel_new.get_farm_AEP() + t_pfmodel_new2 = timerpc() - t0 + + t0 = timerpc() + pfmodel_new_p.run() + aep_pfmodel_new_p = pfmodel_new_p.get_farm_AEP() + t_pfmodel_new_p = timerpc() - t0 + + print("FlorisModel AEP calculation took {:.2f} seconds.".format(t_fmodel)) + print("Original ParallelFlorisModel AEP calculation took {:.2f} seconds.".format( + t_pfmodel_orig + ) + ) + print("New ParallelFlorisModel AEP calculation took {:.2f} seconds.".format(t_pfmodel_new)) + print("New ParallelFlorisModel AEP calculation took {:.2f} seconds the second time.".format( + t_pfmodel_new2 + ) + ) + print("New ParallelFlorisModel (powers only) AEP calculation took {:.2f} seconds.".format( + t_pfmodel_new_p + ) + ) + + print("\n") + print("FlorisModel AEP: {:.2f} GWh.".format(aep_fmodel/1E9)) + print("Original ParallelFlorisModel AEP: {:.2f} GWh.".format(aep_pfmodel_orig/1E9)) + print("New ParallelFlorisModel AEP: {:.2f} GWh.".format(aep_pfmodel_new/1E9)) + print("New ParallelFlorisModel (powers only) AEP: {:.2f} GWh.".format(aep_pfmodel_new/1E9)) diff --git a/floris/floris_model.py b/floris/floris_model.py index 10f410225..0967bd45c 100644 --- a/floris/floris_model.py +++ b/floris/floris_model.py @@ -619,8 +619,8 @@ def _get_weighted_turbine_powers( # Confirm run() has been run if self.core.state is not State.USED: raise RuntimeError( - "Can't run function `FlorisModel.get_farm_power` without " - "first running `FlorisModel.run`." + f"Can't run function `{self.__class__.__name__}.get_farm_power` without " + f"first running `{self.__class__.__name__}.run`." ) if turbine_weights is None: diff --git a/floris/parallel_floris_model_2.py b/floris/parallel_floris_model_2.py new file mode 100644 index 000000000..a20e1ff8f --- /dev/null +++ b/floris/parallel_floris_model_2.py @@ -0,0 +1,277 @@ +from __future__ import annotations + +import copy +import warnings +from pathlib import Path +from time import perf_counter as timerpc + +import numpy as np +import pandas as pd + +from floris.core import State +from floris.floris_model import FlorisModel +from floris.optimization.yaw_optimization.yaw_optimizer_sr import YawOptimizationSR + + +class ParallelFlorisModel(FlorisModel): + """ + This class mimics the FlorisModel, but enables parallelization of the main + computational effort. + """ + + def __init__( + self, + configuration: dict | str | Path, + interface: str | None = "multiprocessing", + max_workers: int = -1, + n_wind_condition_splits: int = -1, + return_turbine_powers_only: bool = False, + print_timings: bool = False + ): + """ + Initialize the ParallelFlorisModel object. + + Args: + configuration: The Floris configuration dictionary or YAML file. + The configuration should have the following inputs specified. + - **flow_field**: See `floris.simulation.flow_field.FlowField` for more details. + - **farm**: See `floris.simulation.farm.Farm` for more details. + - **turbine**: See `floris.simulation.turbine.Turbine` for more details. + - **wake**: See `floris.simulation.wake.WakeManager` for more details. + - **logging**: See `floris.simulation.core.Core` for more details. + interface: The parallelization interface to use. Options are "multiprocessing", + with possible future support for "mpi4py" and "concurrent" + max_workers: The maximum number of workers to use. Defaults to -1, which then + takes the number of CPUs available. + n_wind_condition_splits: The number of wind conditions to split the simulation over. + Defaults to the same as max_workers. + return_turbine_powers_only: Whether to return only the turbine powers. + print_timings (bool): Print the computation time to the console. Defaults to False. + """ + # Instantiate the underlying FlorisModel + if isinstance(configuration, FlorisModel): + self.logger.warning( + "Received an instantiated FlorisModel, when expected a dictionary or path" + " to a FLORIS input file. Converting to dictionary to instantiate " + " the ParallelFlorisModel." + ) + configuration = configuration.core.as_dict() + super().__init__(configuration) + + # Save parallelization parameters + if interface == "multiprocessing": + import multiprocessing as mp + self._PoolExecutor = mp.Pool + if max_workers == -1: + max_workers = mp.cpu_count() + # TODO: test spinning up the worker pool at this point + elif interface in ["mpi4py", "concurrent"]: + raise NotImplementedError( + f"Parallelization interface {interface} not yet supported." + ) + elif interface is None: + self.logger.warning( + "No parallelization interface specified. Running in serial mode." + ) + if return_turbine_powers_only: + self.logger.warn( + "return_turbine_powers_only is not supported in serial mode." + ) + else: + raise ValueError( + f"Invalid parallelization interface {interface}. " + "Options are 'multiprocessing', 'mpi4py' or 'concurrent'." + ) + + self.interface = interface + self.max_workers = max_workers + if n_wind_condition_splits == -1: + self.n_wind_condition_splits = max_workers + else: + self.n_wind_condition_splits = n_wind_condition_splits + self.return_turbine_powers_only = return_turbine_powers_only + self.print_timings = print_timings + + def run(self) -> None: + """ + Run the FLORIS model in parallel. + """ + + if self.return_turbine_powers_only: + # TODO: code here that does not return flow fields + # Somehow, overload methods on FlorisModel that need flow field + # data. + + # This version will call super().get_turbine_powers() on each of + # the splits, and return them somehow. + self._stored_turbine_powers = None # Temporary + if self.interface is None: + t0 = timerpc() + super().run() + t1 = timerpc() + elif self.interface == "multiprocessing": + t0 = timerpc() + self.core.initialize_domain() + parallel_run_inputs = self._preprocessing() + t1 = timerpc() + if self.return_turbine_powers_only: + with self._PoolExecutor(self.max_workers) as p: + self._turbine_powers_split = p.starmap( + _parallel_run_powers_only, + parallel_run_inputs + ) + else: + with self._PoolExecutor(self.max_workers) as p: + self._fmodels_split = p.starmap(_parallel_run, parallel_run_inputs) + t2 = timerpc() + self._postprocessing() + self.core.farm.finalize(self.core.grid.unsorted_indices) + self.core.state = State.USED + t3 = timerpc() + if self.print_timings: + print("===============================================================================") + if self.interface is None: + print(f"Total time spent for serial calculation (interface=None): {t1 - t0:.3f} s") + else: + print( + "Total time spent for parallel calculation " + f"({self.max_workers} workers): {t3-t0:.3f} s" + ) + print(f" Time spent in parallel preprocessing: {t1-t0:.3f} s") + print(f" Time spent in parallel loop execution: {t2-t1:.3f} s.") + print(f" Time spent in parallel postprocessing: {t3-t2:.3f} s") + + def _preprocessing(self): + # Split over the wind conditions + n_wind_condition_splits = self.n_wind_condition_splits + n_wind_condition_splits = np.min( + [n_wind_condition_splits, self.core.flow_field.n_findex] + ) + + # Prepare the input arguments for parallel execution + fmodel_dict = self.core.as_dict() + wind_condition_id_splits = np.array_split( + np.arange(self.core.flow_field.n_findex), + n_wind_condition_splits, + ) + multiargs = [] + for wc_id_split in wind_condition_id_splits: + # for ws_id_split in wind_speed_id_splits: + fmodel_dict_split = copy.deepcopy(fmodel_dict) + wind_directions = self.core.flow_field.wind_directions[wc_id_split] + wind_speeds = self.core.flow_field.wind_speeds[wc_id_split] + turbulence_intensities = self.core.flow_field.turbulence_intensities[wc_id_split] + + # Extract and format all control setpoints as a dict that can be unpacked later + control_setpoints_subset = { + "yaw_angles": self.core.farm.yaw_angles[wc_id_split, :], + "power_setpoints": self.core.farm.power_setpoints[wc_id_split, :], + "awc_modes": self.core.farm.awc_modes[wc_id_split, :], + "awc_amplitudes": self.core.farm.awc_amplitudes[wc_id_split, :], + "awc_frequencies": self.core.farm.awc_frequencies[wc_id_split, :], + } + fmodel_dict_split["flow_field"]["wind_directions"] = wind_directions + fmodel_dict_split["flow_field"]["wind_speeds"] = wind_speeds + fmodel_dict_split["flow_field"]["turbulence_intensities"] = turbulence_intensities + + # Prepare lightweight data to pass along + multiargs.append((fmodel_dict_split, control_setpoints_subset)) + + return multiargs + + def _postprocessing(self): + # Append the remaining flow_fields + # Could consider adding a merge method to the FlowField class + # to make this easier + + if self.return_turbine_powers_only: + self._stored_turbine_powers = np.vstack(self._turbine_powers_split) + else: + # Ensure fields to set have correct dimensions + self.core.flow_field.u = self._fmodels_split[0].core.flow_field.u + self.core.flow_field.v = self._fmodels_split[0].core.flow_field.v + self.core.flow_field.w = self._fmodels_split[0].core.flow_field.w + self.core.flow_field.turbulence_intensity_field = \ + self._fmodels_split[0].core.flow_field.turbulence_intensity_field + + for fm in self._fmodels_split[1:]: + self.core.flow_field.u = np.append( + self.core.flow_field.u, + fm.core.flow_field.u, + axis=0 + ) + self.core.flow_field.v = np.append( + self.core.flow_field.v, + fm.core.flow_field.v, + axis=0 + ) + self.core.flow_field.w = np.append( + self.core.flow_field.w, + fm.core.flow_field.w, + axis=0 + ) + self.core.flow_field.turbulence_intensity_field = np.append( + self.core.flow_field.turbulence_intensity_field, + fm.core.flow_field.turbulence_intensity_field, + axis=0 + ) + + def _get_turbine_powers(self): + """ + Calculates the power at each turbine in the wind farm. + This override will only be necessary if we want to be able to + use the return_turbine_powers_only option. Need to check if that + makes a significant speed difference. + + Returns: + NDArrayFloat: Powers at each turbine. + """ + if self.core.state is not State.USED: + self.logger.warning( + f"Please call `{self.__class__.__name__}.run` before computing" + " turbine powers. In future versions, an explicit run() call will" + "be required." + ) + self.run() + if self.return_turbine_powers_only: + return self._stored_turbine_powers + else: + return super()._get_turbine_powers() + + @property + def fmodel(self): + """ + Raise deprecation warning. + """ + self.logger.warning( + "ParallelFlorisModel no longer contains `fmodel` as an attribute " + "and now directly inherits from FlorisModel. Please use the " + "attributes and methods of FlorisModel directly." + ) + + +def _parallel_run(fmodel_dict, set_kwargs) -> FlorisModel: + """ + Run the FLORIS model in parallel. + + Args: + fmodel: The FLORIS model to run. + set_kwargs: Additional keyword arguments to pass to fmodel.set(). + """ + fmodel = FlorisModel(fmodel_dict) + fmodel.set(**set_kwargs) + fmodel.run() + return fmodel + +def _parallel_run_powers_only(fmodel_dict, set_kwargs) -> np.ndarray: + """ + Run the FLORIS model in parallel, returning only the turbine powers. + + Args: + fmodel: The FLORIS model to run. + set_kwargs: Additional keyword arguments to pass to fmodel.set(). + """ + fmodel = FlorisModel(fmodel_dict) + fmodel.set(**set_kwargs) + fmodel.run() + return fmodel.get_turbine_powers() diff --git a/tests/parallel_floris_model_2_unit_test.py b/tests/parallel_floris_model_2_unit_test.py new file mode 100644 index 000000000..9b35b2cb7 --- /dev/null +++ b/tests/parallel_floris_model_2_unit_test.py @@ -0,0 +1,215 @@ + +import copy +import logging + +import numpy as np +import pytest + +from floris import ( + FlorisModel, + TimeSeries, + WindRose, +) +from floris.parallel_floris_model_2 import ParallelFlorisModel + + +DEBUG = False +VELOCITY_MODEL = "gauss" +DEFLECTION_MODEL = "gauss" + +def test_None_interface(sample_inputs_fixture): + """ + With interface=None, the ParallelFlorisModel should behave exactly like the FlorisModel. + (ParallelFlorisModel.run() simply calls the parent FlorisModel.run()). + """ + sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL + sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL + + fmodel = FlorisModel(sample_inputs_fixture.core) + pfmodel = ParallelFlorisModel( + sample_inputs_fixture.core, + interface=None, + n_wind_condition_splits=2 # Not used when interface=None + ) + + fmodel.run() + pfmodel.run() + + f_turb_powers = fmodel.get_turbine_powers() + pf_turb_powers = pfmodel.get_turbine_powers() + + assert np.allclose(f_turb_powers, pf_turb_powers) + +def test_multiprocessing_interface(sample_inputs_fixture): + """ + With interface="multiprocessing", the ParallelFlorisModel should return the same powers + as the FlorisModel. + """ + sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL + sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL + + fmodel = FlorisModel(sample_inputs_fixture.core) + pfmodel = ParallelFlorisModel( + sample_inputs_fixture.core, + interface="multiprocessing", + n_wind_condition_splits=2 + ) + + fmodel.run() + pfmodel.run() + + f_turb_powers = fmodel.get_turbine_powers() + pf_turb_powers = pfmodel.get_turbine_powers() + + assert np.allclose(f_turb_powers, pf_turb_powers) + +def test_return_turbine_powers_only(sample_inputs_fixture): + """ + With return_turbine_powers_only=True, the ParallelFlorisModel should return only the + turbine powers, not the full results. + """ + sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL + sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL + + fmodel = FlorisModel(sample_inputs_fixture.core) + pfmodel = ParallelFlorisModel( + sample_inputs_fixture.core, + interface="multiprocessing", + n_wind_condition_splits=2, + return_turbine_powers_only=True + ) + + fmodel.run() + pfmodel.run() + + f_turb_powers = fmodel.get_turbine_powers() + pf_turb_powers = pfmodel.get_turbine_powers() + + assert np.allclose(f_turb_powers, pf_turb_powers) + +def test_run_error(sample_inputs_fixture, caplog): + """ + Check that an error is raised if an output is requested before calling run(). + """ + sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL + sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL + + pfmodel = ParallelFlorisModel( + sample_inputs_fixture.core, + interface="multiprocessing", + n_wind_condition_splits=2 + ) + + # In future versions, error will be raised + # with pytest.raises(RuntimeError): + # pfmodel.get_turbine_powers() + # with pytest.raises(RuntimeError): + # pfmodel.get_farm_AEP() + + # For now, only a warning is raised for backwards compatibility + with caplog.at_level(logging.WARNING): + pfmodel.get_turbine_powers() + assert caplog.text != "" # Checking not empty + caplog.clear() + +def test_configuration_compatibility(sample_inputs_fixture, caplog): + """ + Check that the ParallelFlorisModel is compatible with FlorisModel and + UncertainFlorisModel configurations. + """ + + sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL + sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL + + fmodel = FlorisModel(sample_inputs_fixture.core) + + with caplog.at_level(logging.WARNING): + ParallelFlorisModel(fmodel) + assert caplog.text != "" # Checking not empty + caplog.clear() + + pfmodel = ParallelFlorisModel(sample_inputs_fixture.core) + with caplog.at_level(logging.WARNING): + pfmodel.fmodel + assert caplog.text != "" # Checking not empty + caplog.clear() + + with pytest.raises(AttributeError): + pfmodel.fmodel.core + +def test_wind_data_objects(sample_inputs_fixture): + """ + Check that the ParallelFlorisModel is compatible with WindData objects. + """ + + sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL + sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL + + fmodel = FlorisModel(sample_inputs_fixture.core) + pfmodel = ParallelFlorisModel(sample_inputs_fixture.core, max_workers=2) + + # Create a wind rose and set onto both models + wind_speeds = np.array([8.0, 10.0, 12.0, 8.0, 10.0, 12.0]) + wind_directions = np.array([270.0, 270.0, 270.0, 280.0, 280.0, 280.0]) + wind_rose = WindRose( + wind_directions=np.unique(wind_directions), + wind_speeds=np.unique(wind_speeds), + ti_table=0.06 + ) + fmodel.set(wind_data=wind_rose) + pfmodel.set(wind_data=wind_rose) + + # Run; get turbine powers; compare results + fmodel.run() + powers_fmodel_wr = fmodel.get_turbine_powers() + pfmodel.run() + powers_pfmodel_wr = pfmodel.get_turbine_powers() + + assert powers_fmodel_wr.shape == powers_pfmodel_wr.shape + assert np.allclose(powers_fmodel_wr, powers_pfmodel_wr) + + # Test a TimeSeries object + wind_speeds = np.array([8.0, 8.0, 9.0]) + wind_directions = np.array([270.0, 270.0, 270.0]) + values = np.array([30.0, 20.0, 10.0]) + time_series = TimeSeries( + wind_directions=wind_directions, + wind_speeds=wind_speeds, + turbulence_intensities=0.06, + values=values, + ) + fmodel.set(wind_data=time_series) + pfmodel.set(wind_data=time_series) + + fmodel.run() + powers_fmodel_ts = fmodel.get_turbine_powers() + pfmodel.run() + powers_pfmodel_ts = pfmodel.get_turbine_powers() + + assert powers_fmodel_ts.shape == powers_pfmodel_ts.shape + assert np.allclose(powers_fmodel_ts, powers_pfmodel_ts) + +def test_control_setpoints(sample_inputs_fixture): + """ + Check that the ParallelFlorisModel is compatible with yaw angles. + """ + + sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL + sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL + + fmodel = FlorisModel(sample_inputs_fixture.core) + pfmodel = ParallelFlorisModel(sample_inputs_fixture.core, max_workers=2) + + # Set yaw angles + yaw_angles = np.tile(np.array([[10.0, 20.0, 30.0]]), (fmodel.n_findex,1)) + fmodel.set(yaw_angles=yaw_angles) + pfmodel.set(yaw_angles=yaw_angles) + + # Run; get turbine powers; compare results + fmodel.run() + powers_fmodel = fmodel.get_turbine_powers() + pfmodel.run() + powers_pfmodel = pfmodel.get_turbine_powers() + + assert powers_fmodel.shape == powers_pfmodel.shape + assert np.allclose(powers_fmodel, powers_pfmodel)