From 0b8df8b1338dd799dde03f1602b8c377ea8eefa0 Mon Sep 17 00:00:00 2001 From: Vincenzo Eduardo Padulano Date: Tue, 15 Oct 2024 01:16:35 +0200 Subject: [PATCH] [df] Unify local and distributed API Unify the main common entry points between local and distributed RDataFrame API. Currently these changes affect: - The ROOT.RDataFrame constructor - ROOT.RDF.RunGraphs - ROOT.RDF.Experimental.VariationsFor Anytime one of the above is called, a pythonization will dispatch to the appropriate RDataFrame flavour, depending on the arguments. This dispatcher checks for the presence of an "executor" keyword argument, in which case this is expected to be an instance of either `distributed.Client` or `pyspark.SparkContext` as those are the two distributed executors currently supported. Previous usage of the distributed module with fully qualified names of functions still works, although usage of the unified API is preferrable and advisable. --- .../python/DistRDF/Backends/Dask/__init__.py | 29 ++++++- .../python/DistRDF/Backends/Spark/__init__.py | 31 ++++++- .../distrdf/python/DistRDF/__init__.py | 38 +++++++-- bindings/pyroot/pythonizations/CMakeLists.txt | 3 +- .../pythonizations/python/ROOT/_facade.py | 39 ++++----- .../ROOT/_pythonization/_rdf_namespace.py | 85 +++++++++++++++++++ tree/dataframe/src/RDataFrame.cxx | 51 +++++------ .../dataframe/distrdf001_spark_connection.py | 5 +- .../dataframe/distrdf002_dask_connection.py | 5 +- .../dataframe/distrdf004_dask_lxbatch.py | 3 +- 10 files changed, 220 insertions(+), 69 deletions(-) create mode 100644 bindings/pyroot/pythonizations/python/ROOT/_pythonization/_rdf_namespace.py diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/Dask/__init__.py b/bindings/experimental/distrdf/python/DistRDF/Backends/Dask/__init__.py index cd9014e03af42..6bb0c182e88e6 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/Dask/__init__.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/Dask/__init__.py @@ -3,13 +3,15 @@ # @date 2021-11 ################################################################################ -# Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. # +# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. # # All rights reserved. # # # # For the licensing terms see $ROOTSYS/LICENSE. # # For the list of contributors see $ROOTSYS/README/CREDITS. # ################################################################################ from __future__ import annotations +import warnings + def RDataFrame(*args, **kwargs): """ @@ -18,6 +20,29 @@ def RDataFrame(*args, **kwargs): from DistRDF.Backends.Dask import Backend daskclient = kwargs.get("daskclient", None) - daskbackend = Backend.DaskBackend(daskclient=daskclient) + executor = kwargs.get("executor", None) + msg_warn = ( + "The keyword argument 'daskclient' is not necessary anymore and will " + "be removed in a future release. Use 'executor' instead to provide the " + "distributed.Client object." + ) + msg_err = ( + "Both the 'daskclient' and 'executor' keyword arguments were provided. " + "This is not supported. Please provide only the 'executor' argument." + ) + + if daskclient is not None: + warnings.warn(msg_warn, FutureWarning) + if executor is not None: + raise ValueError(msg_err) + else: + executor = daskclient + daskclient = None + + if executor is not None and daskclient is not None: + warnings.warn(msg_warn, FutureWarning) + raise ValueError(msg_err) + + daskbackend = Backend.DaskBackend(daskclient=executor) return daskbackend.make_dataframe(*args, **kwargs) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/Spark/__init__.py b/bindings/experimental/distrdf/python/DistRDF/Backends/Spark/__init__.py index 716601eac122b..1c507eba635c7 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/Spark/__init__.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/Spark/__init__.py @@ -1,15 +1,17 @@ -## @author Vincenzo Eduardo Padulano +# @author Vincenzo Eduardo Padulano # @author Enric Tejedor # @date 2021-02 ################################################################################ -# Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. # +# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. # # All rights reserved. # # # # For the licensing terms see $ROOTSYS/LICENSE. # # For the list of contributors see $ROOTSYS/README/CREDITS. # ################################################################################ from __future__ import annotations +import warnings + def RDataFrame(*args, **kwargs): """ @@ -18,6 +20,29 @@ def RDataFrame(*args, **kwargs): from DistRDF.Backends.Spark import Backend sparkcontext = kwargs.get("sparkcontext", None) - spark = Backend.SparkBackend(sparkcontext=sparkcontext) + executor = kwargs.get("executor", None) + msg_warn = ( + "The keyword argument 'sparkcontext' is not necessary anymore and will " + "be removed in a future release. Use 'executor' instead to provide the " + "SparkContext object." + ) + msg_err = ( + "Both the 'sparkcontext' and 'executor' keyword arguments were provided. " + "This is not supported. Please provide only the 'executor' argument." + ) + + if sparkcontext is not None: + warnings.warn(msg_warn, FutureWarning) + if executor is not None: + raise ValueError(msg_err) + else: + executor = sparkcontext + sparkcontext = None + + if executor is not None and sparkcontext is not None: + warnings.warn(msg_warn, FutureWarning) + raise ValueError(msg_err) + + spark = Backend.SparkBackend(sparkcontext=executor) return spark.make_dataframe(*args, **kwargs) diff --git a/bindings/experimental/distrdf/python/DistRDF/__init__.py b/bindings/experimental/distrdf/python/DistRDF/__init__.py index cb69b64dd2700..b2d7b2369e8c9 100644 --- a/bindings/experimental/distrdf/python/DistRDF/__init__.py +++ b/bindings/experimental/distrdf/python/DistRDF/__init__.py @@ -3,7 +3,7 @@ # @date 2021-02 ################################################################################ -# Copyright (C) 1995-2022, Rene Brun and Fons Rademakers. # +# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. # # All rights reserved. # # # # For the licensing terms see $ROOTSYS/LICENSE. # @@ -68,19 +68,17 @@ def RunGraphs(proxies: Iterable) -> int: @code{.py} import ROOT - RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame - RunGraphs = ROOT.RDF.Experimental.Distributed.RunGraphs # Create 3 different dataframes and book an histogram on each one histoproxies = [ - RDataFrame(100) + ROOT.RDataFrame(100, executor=SupportedExecutor(...)) .Define("x", "rdfentry_") .Histo1D(("name", "title", 10, 0, 100), "x") for _ in range(4) ] # Execute the 3 computation graphs - n_graphs_run = RunGraphs(histoproxies) + n_graphs_run = ROOT.RDF.RunGraphs(histoproxies) # Retrieve all the histograms in one go histos = [histoproxy.GetValue() for histoproxy in histoproxies] @endcode @@ -138,3 +136,33 @@ def create_distributed_module(parentmodule): distributed.LiveVisualize = LiveVisualize return distributed + + +def RDataFrame(*args, **kwargs): + executor = kwargs.get("executor", None) + if executor is None: + raise ValueError( + "Missing keyword argument 'client'. Please provide a connection object " + "to one of the schedulers supported by distributed RDataFrame." + ) + + # Try to dispatch to the correct distributed scheduler implementation + try: + from distributed import Client + from DistRDF.Backends.Dask import RDataFrame + if isinstance(executor, Client): + return RDataFrame(*args, **kwargs) + except ImportError: + pass + + try: + from pyspark import SparkContext + from DistRDF.Backends.Spark import RDataFrame + if isinstance(executor, SparkContext): + return RDataFrame(*args, **kwargs) + except ImportError: + pass + + raise TypeError( + f"The client object of type '{type(executor)}' is not a supported " + "connection type for distributed RDataFrame.") diff --git a/bindings/pyroot/pythonizations/CMakeLists.txt b/bindings/pyroot/pythonizations/CMakeLists.txt index fd19abecfaec6..2880650e75db1 100644 --- a/bindings/pyroot/pythonizations/CMakeLists.txt +++ b/bindings/pyroot/pythonizations/CMakeLists.txt @@ -14,7 +14,8 @@ if(dataframe) ROOT/_pythonization/_rdataframe.py ROOT/_pythonization/_rdfdescription.py ROOT/_pythonization/_rdf_conversion_maps.py - ROOT/_pythonization/_rdf_pyz.py) + ROOT/_pythonization/_rdf_pyz.py + ROOT/_pythonization/_rdf_namespace.py) endif() if(roofit) diff --git a/bindings/pyroot/pythonizations/python/ROOT/_facade.py b/bindings/pyroot/pythonizations/python/ROOT/_facade.py index 8bb5ff2aa92d9..2280054bad610 100644 --- a/bindings/pyroot/pythonizations/python/ROOT/_facade.py +++ b/bindings/pyroot/pythonizations/python/ROOT/_facade.py @@ -47,23 +47,6 @@ def __setattr__(self, name, value): return setattr(self._gROOT, name, value) -def _create_rdf_experimental_distributed_module(parent): - """ - Create the ROOT.RDF.Experimental.Distributed python module. - - This module will be injected into the ROOT.RDF namespace. - - Arguments: - parent: The ROOT.RDF namespace. Needed to define __package__. - - Returns: - types.ModuleType: The ROOT.RDF.Experimental.Distributed submodule. - """ - import DistRDF - - return DistRDF.create_distributed_module(parent) - - def _subimport(name): # type: (str) -> types.ModuleType """ @@ -358,8 +341,11 @@ def MakePandasDataFrame(df): ns.FromPandas = MakePandasDataFrame try: - # Inject Experimental.Distributed package into namespace RDF if available - ns.Experimental.Distributed = _create_rdf_experimental_distributed_module(ns.Experimental) + # Inject Pythonizations to interact between local and distributed RDF package + from ._pythonization._rdf_namespace import _create_distributed_module, _rungraphs, _variationsfor + ns.Experimental.Distributed = _create_distributed_module(ns.Experimental) + ns.RunGraphs = _rungraphs(ns.Experimental.Distributed.RunGraphs, ns.RunGraphs) + ns.Experimental.VariationsFor = _variationsfor(ns.Experimental.Distributed.VariationsFor, ns.Experimental.VariationsFor) except ImportError: pass except: @@ -367,6 +353,21 @@ def MakePandasDataFrame(df): del type(self).RDF return ns + @property + def RDataFrame(self): + """ + Dispatch between the local and distributed RDataFrame depending on + input arguments. + """ + local_rdf = self.__getattr__("RDataFrame") + try: + import DistRDF + from ._pythonization._rdf_namespace import _rdataframe + return _rdataframe(local_rdf, DistRDF.RDataFrame) + except ImportError: + return local_rdf + + # Overload RooFit namespace @property def RooFit(self): diff --git a/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_rdf_namespace.py b/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_rdf_namespace.py new file mode 100644 index 0000000000000..534cec9e8b799 --- /dev/null +++ b/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_rdf_namespace.py @@ -0,0 +1,85 @@ +# Author: Vincenzo Eduardo Padulano CERN 10/2024 + +################################################################################ +# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. # +# All rights reserved. # +# # +# For the licensing terms see $ROOTSYS/LICENSE. # +# For the list of contributors see $ROOTSYS/README/CREDITS. # +################################################################################ + +""" +This module contains utilities to help in the organization of the RDataFrame +namespace and the interaction between the C++ and Python functionalities +""" + + +def _create_distributed_module(parent): + """ + Create the ROOT.RDF.Experimental.Distributed python module. + + This module will be injected into the ROOT.RDF namespace. + + Arguments: + parent: The ROOT.RDF namespace. Needed to define __package__. + + Returns: + types.ModuleType: The ROOT.RDF.Experimental.Distributed submodule. + """ + import DistRDF + + return DistRDF.create_distributed_module(parent) + + +def _rungraphs(distrdf_rungraphs, rdf_rungraphs): + """ + Create a callable that correctly dispatches either to the local or + distributed version of RunGraphs. + """ + + def rungraphs(handles): + # Caveat: we should not call `hasattr` on the result pointer, since + # this will implicitly trigger the connected computation graph + if len(handles) > 0 and "DistRDF" in str(type(handles[0])): + return distrdf_rungraphs(handles) + else: + return rdf_rungraphs(handles) + + return rungraphs + + +def _variationsfor(distrdf_variationsfor, rdf_variationsfor): + """ + Create a callable that correctly dispatches either to the local or + distributed version of VariationsFor. + """ + + def variationsfor(resptr): + # Caveat: we should not call `hasattr` on the result pointer, since + # this will implicitly trigger the connected computation graph + if "DistRDF" in str(type(resptr)): + return distrdf_variationsfor(resptr) + else: + # Help local VariationsFor with the type of the value held by the result pointer + inner_type = type(resptr).__name__ + inner_type = inner_type[ + inner_type.index("<") + 1: inner_type.rindex(">")] + return rdf_variationsfor[inner_type](resptr) + + return variationsfor + + +def _rdataframe(local_rdf, distributed_rdf): + """ + Create a callable that correctly dispatches either to the local or + distributed RDataFrame constructor, depending on whether the "executor" + keyword argument is absent or not. + """ + + def rdataframe(*args, **kwargs): + if kwargs.get("executor", None) is not None: + return distributed_rdf(*args, **kwargs) + else: + return local_rdf(*args, **kwargs) + + return rdataframe diff --git a/tree/dataframe/src/RDataFrame.cxx b/tree/dataframe/src/RDataFrame.cxx index 8497a2c19f498..4aa54806e23b4 100644 --- a/tree/dataframe/src/RDataFrame.cxx +++ b/tree/dataframe/src/RDataFrame.cxx @@ -682,12 +682,12 @@ the backend-specific `RDataFrame` of your choice, for example: ~~~{.py} import ROOT - -# Point RDataFrame calls to the Spark specific RDataFrame -RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame - +from distributed import Client # It still accepts the same constructor arguments as traditional RDataFrame -df = RDataFrame("mytree", "myfile.root") +# but needs a client object which allows connecting to one of the supported +# schedulers (read more info below) +client = Client(...) +df = ROOT.RDataFrame("mytree", "myfile.root", executor=client) # Continue the application with the traditional RDataFrame API sum = df.Filter("x > 10").Sum("y") @@ -739,16 +739,16 @@ import ROOT conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf) -# Point RDataFrame calls to the Spark specific RDataFrame -RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame - # The Spark RDataFrame constructor accepts an optional "sparkcontext" parameter # and it will distribute the application to the connected cluster -df = RDataFrame("mytree", "myfile.root", sparkcontext = sc) +df = RDataFrame("mytree", "myfile.root", executor = sc) ~~~ -If an instance of [SparkContext](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html) -is not provided, the default behaviour is to create one in the background for you. +Note that with the usage above the case of `executor = None` is not supported. One +can explicitly create a `ROOT.RDF.Experimental.Distributed.Spark.RDataFrame` object +in order to get a default instance of +[SparkContext](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html) +in case it is not already provided as argument. ### Connecting to a Dask cluster @@ -760,9 +760,6 @@ of the cluster schedulers supported by Dask (more information in the import ROOT from dask.distributed import Client -# Point RDataFrame calls to the Dask specific RDataFrame -RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame - # In a Python script the Dask client needs to be initalized in a context # Jupyter notebooks / Python session don't need this if __name__ == "__main__": @@ -770,14 +767,17 @@ if __name__ == "__main__": client = Client("dask_scheduler.domain.com:8786") # The Dask RDataFrame constructor accepts the Dask Client object as an optional argument - df = RDataFrame("mytree","myfile.root", daskclient=client) + df = RDataFrame("mytree","myfile.root", executor=client) # Proceed as usual df.Define("x","someoperation").Histo1D(("name", "title", 10, 0, 10), "x") ~~~ -If an instance of [distributed.Client](http://distributed.dask.org/en/stable/api.html#distributed.Client) is not -provided to the RDataFrame object, it will be created for you and it will run the computations in the local machine -using all cores available. +Note that with the usage above the case of `executor = None` is not supported. One +can explicitly create a `ROOT.RDF.Experimental.Distributed.Dask.RDataFrame` object +in order to get a default instance of +[distributed.Client](http://distributed.dask.org/en/stable/api.html#distributed.Client) +in case it is not already provided as argument. This will run multiple processes +on the local machine using all available cores. ### Choosing the number of distributed tasks @@ -797,13 +797,9 @@ backend used: ~~~{.py} import ROOT -# Define correct imports and access the distributed RDataFrame appropriate for the -# backend used in the analysis -RDataFrame = ROOT.RDF.Experimental.Distributed.[BACKEND].RDataFrame - if __name__ == "__main__": # The `npartitions` optional argument tells the RDataFrame how many tasks are desired - df = RDataFrame("mytree","myfile.root", npartitions=NPARTITIONS) + df = ROOT.RDataFrame("mytree", "myfile.root", executor=SupportedExecutor(...), npartitions=NPARTITIONS) # Proceed as usual df.Define("x","someoperation").Histo1D(("name", "title", 10, 0, 10), "x") ~~~ @@ -828,19 +824,17 @@ triggered concurrently to send multiple computation graphs to a distributed clus ~~~{.py} import ROOT -RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame -RunGraphs = ROOT.RDF.Experimental.Distributed.RunGraphs # Create 3 different dataframes and book an histogram on each one histoproxies = [ - RDataFrame(100) + ROOT.RDataFrame(100, executor=SupportedExecutor(...)) .Define("x", "rdfentry_") .Histo1D(("name", "title", 10, 0, 100), "x") for _ in range(4) ] # Execute the 3 computation graphs -RunGraphs(histoproxies) +ROOT.RDF.RunGraphs(histoproxies) # Retrieve all the histograms in one go histos = [histoproxy.GetValue() for histoproxy in histoproxies] ~~~ @@ -855,10 +849,9 @@ computed, e.g. the axis range and the number of bins: ~~~{.py} import ROOT -RDataFrame = ROOT.RDF.Experimental.Distributed.[BACKEND].RDataFrame if __name__ == "__main__": - df = RDataFrame("mytree","myfile.root").Define("x","someoperation") + df = ROOT.RDataFrame("mytree","myfile.root",executor=SupportedExecutor(...)).Define("x","someoperation") # The model can be passed either as a tuple with the arguments in the correct order df.Histo1D(("name", "title", 10, 0, 10), "x") # Or by creating the specific struct diff --git a/tutorials/dataframe/distrdf001_spark_connection.py b/tutorials/dataframe/distrdf001_spark_connection.py index 946466897f1a6..13de158178f89 100644 --- a/tutorials/dataframe/distrdf001_spark_connection.py +++ b/tutorials/dataframe/distrdf001_spark_connection.py @@ -18,9 +18,6 @@ import pyspark import ROOT -# Point RDataFrame calls to Spark RDataFrame object -RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame - # Setup the connection to Spark # First create a dictionary with keys representing Spark specific configuration # parameters. In this tutorial we use the following configuration parameters: @@ -55,7 +52,7 @@ sparkcontext = pyspark.SparkContext(conf=sparkconf) # Create an RDataFrame that will use Spark as a backend for computations -df = RDataFrame(1000, sparkcontext=sparkcontext) +df = ROOT.RDataFrame(1000, executor=sparkcontext) # Set the random seed and define two columns of the dataset with random numbers. ROOT.gRandom.SetSeed(1) diff --git a/tutorials/dataframe/distrdf002_dask_connection.py b/tutorials/dataframe/distrdf002_dask_connection.py index f89aa749dc099..2f42d2c770b61 100644 --- a/tutorials/dataframe/distrdf002_dask_connection.py +++ b/tutorials/dataframe/distrdf002_dask_connection.py @@ -18,9 +18,6 @@ from dask.distributed import LocalCluster, Client import ROOT -# Point RDataFrame calls to Dask RDataFrame object -RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame - def create_connection(): """ @@ -89,7 +86,7 @@ def create_connection(): # Create the connection to the mock Dask cluster on the local machine connection = create_connection() # Create an RDataFrame that will use Dask as a backend for computations - df = RDataFrame(1000, daskclient=connection) + df = ROOT.RDataFrame(1000, executor=connection) # Set the random seed and define two columns of the dataset with random numbers. ROOT.gRandom.SetSeed(1) diff --git a/tutorials/dataframe/distrdf004_dask_lxbatch.py b/tutorials/dataframe/distrdf004_dask_lxbatch.py index 1d4fe7a3dea69..c3f3945d5f6f3 100644 --- a/tutorials/dataframe/distrdf004_dask_lxbatch.py +++ b/tutorials/dataframe/distrdf004_dask_lxbatch.py @@ -28,7 +28,6 @@ from dask_lxplus import CernCluster import ROOT -RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame def create_connection() -> Client: @@ -90,7 +89,7 @@ def run_analysis(connection: Client) -> None: Run a simple example with RDataFrame, using the previously created connection to the HTCondor cluster. """ - df = RDataFrame(10_000, daskclient=connection).Define( + df = ROOT.RDataFrame(10_000, executor=connection).Define( "x", "gRandom->Rndm() * 100") nentries = df.Count()