Skip to content

Commit

Permalink
[df] Unify local and distributed API
Browse files Browse the repository at this point in the history
Unify the main common entry points between local and distributed RDataFrame API. Currently these changes affect:

- The ROOT.RDataFrame constructor
- ROOT.RDF.RunGraphs
- ROOT.RDF.Experimental.VariationsFor

Anytime one of the above is called, a pythonization will dispatch to the appropriate RDataFrame flavour, depending on the arguments. This dispatcher checks for the presence of an "executor" keyword argument, in which case this is expected to be an instance of either `distributed.Client` or `pyspark.SparkContext` as those are the two distributed executors currently supported.

Previous usage of the distributed module with fully qualified names of functions still works, although usage of the unified API is preferrable and advisable.
  • Loading branch information
vepadulano committed Oct 15, 2024
1 parent 153c6b7 commit 0b8df8b
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
# @date 2021-11

################################################################################
# Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. #
# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. #
# All rights reserved. #
# #
# For the licensing terms see $ROOTSYS/LICENSE. #
# For the list of contributors see $ROOTSYS/README/CREDITS. #
################################################################################
from __future__ import annotations
import warnings


def RDataFrame(*args, **kwargs):
"""
Expand All @@ -18,6 +20,29 @@ def RDataFrame(*args, **kwargs):

from DistRDF.Backends.Dask import Backend
daskclient = kwargs.get("daskclient", None)
daskbackend = Backend.DaskBackend(daskclient=daskclient)
executor = kwargs.get("executor", None)
msg_warn = (
"The keyword argument 'daskclient' is not necessary anymore and will "
"be removed in a future release. Use 'executor' instead to provide the "
"distributed.Client object."
)
msg_err = (
"Both the 'daskclient' and 'executor' keyword arguments were provided. "
"This is not supported. Please provide only the 'executor' argument."
)

if daskclient is not None:
warnings.warn(msg_warn, FutureWarning)
if executor is not None:
raise ValueError(msg_err)
else:
executor = daskclient
daskclient = None

if executor is not None and daskclient is not None:
warnings.warn(msg_warn, FutureWarning)
raise ValueError(msg_err)

daskbackend = Backend.DaskBackend(daskclient=executor)

return daskbackend.make_dataframe(*args, **kwargs)
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
## @author Vincenzo Eduardo Padulano
# @author Vincenzo Eduardo Padulano
# @author Enric Tejedor
# @date 2021-02

################################################################################
# Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. #
# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. #
# All rights reserved. #
# #
# For the licensing terms see $ROOTSYS/LICENSE. #
# For the list of contributors see $ROOTSYS/README/CREDITS. #
################################################################################
from __future__ import annotations
import warnings


def RDataFrame(*args, **kwargs):
"""
Expand All @@ -18,6 +20,29 @@ def RDataFrame(*args, **kwargs):

from DistRDF.Backends.Spark import Backend
sparkcontext = kwargs.get("sparkcontext", None)
spark = Backend.SparkBackend(sparkcontext=sparkcontext)
executor = kwargs.get("executor", None)
msg_warn = (
"The keyword argument 'sparkcontext' is not necessary anymore and will "
"be removed in a future release. Use 'executor' instead to provide the "
"SparkContext object."
)
msg_err = (
"Both the 'sparkcontext' and 'executor' keyword arguments were provided. "
"This is not supported. Please provide only the 'executor' argument."
)

if sparkcontext is not None:
warnings.warn(msg_warn, FutureWarning)
if executor is not None:
raise ValueError(msg_err)
else:
executor = sparkcontext
sparkcontext = None

if executor is not None and sparkcontext is not None:
warnings.warn(msg_warn, FutureWarning)
raise ValueError(msg_err)

spark = Backend.SparkBackend(sparkcontext=executor)

return spark.make_dataframe(*args, **kwargs)
38 changes: 33 additions & 5 deletions bindings/experimental/distrdf/python/DistRDF/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# @date 2021-02

################################################################################
# Copyright (C) 1995-2022, Rene Brun and Fons Rademakers. #
# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. #
# All rights reserved. #
# #
# For the licensing terms see $ROOTSYS/LICENSE. #
Expand Down Expand Up @@ -68,19 +68,17 @@ def RunGraphs(proxies: Iterable) -> int:
@code{.py}
import ROOT
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
RunGraphs = ROOT.RDF.Experimental.Distributed.RunGraphs
# Create 3 different dataframes and book an histogram on each one
histoproxies = [
RDataFrame(100)
ROOT.RDataFrame(100, executor=SupportedExecutor(...))
.Define("x", "rdfentry_")
.Histo1D(("name", "title", 10, 0, 100), "x")
for _ in range(4)
]
# Execute the 3 computation graphs
n_graphs_run = RunGraphs(histoproxies)
n_graphs_run = ROOT.RDF.RunGraphs(histoproxies)
# Retrieve all the histograms in one go
histos = [histoproxy.GetValue() for histoproxy in histoproxies]
@endcode
Expand Down Expand Up @@ -138,3 +136,33 @@ def create_distributed_module(parentmodule):
distributed.LiveVisualize = LiveVisualize

return distributed


def RDataFrame(*args, **kwargs):
executor = kwargs.get("executor", None)
if executor is None:
raise ValueError(
"Missing keyword argument 'client'. Please provide a connection object "
"to one of the schedulers supported by distributed RDataFrame."
)

# Try to dispatch to the correct distributed scheduler implementation
try:
from distributed import Client
from DistRDF.Backends.Dask import RDataFrame
if isinstance(executor, Client):
return RDataFrame(*args, **kwargs)
except ImportError:
pass

try:
from pyspark import SparkContext
from DistRDF.Backends.Spark import RDataFrame
if isinstance(executor, SparkContext):
return RDataFrame(*args, **kwargs)
except ImportError:
pass

raise TypeError(
f"The client object of type '{type(executor)}' is not a supported "
"connection type for distributed RDataFrame.")
3 changes: 2 additions & 1 deletion bindings/pyroot/pythonizations/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ if(dataframe)
ROOT/_pythonization/_rdataframe.py
ROOT/_pythonization/_rdfdescription.py
ROOT/_pythonization/_rdf_conversion_maps.py
ROOT/_pythonization/_rdf_pyz.py)
ROOT/_pythonization/_rdf_pyz.py
ROOT/_pythonization/_rdf_namespace.py)
endif()

if(roofit)
Expand Down
39 changes: 20 additions & 19 deletions bindings/pyroot/pythonizations/python/ROOT/_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,6 @@ def __setattr__(self, name, value):
return setattr(self._gROOT, name, value)


def _create_rdf_experimental_distributed_module(parent):
"""
Create the ROOT.RDF.Experimental.Distributed python module.
This module will be injected into the ROOT.RDF namespace.
Arguments:
parent: The ROOT.RDF namespace. Needed to define __package__.
Returns:
types.ModuleType: The ROOT.RDF.Experimental.Distributed submodule.
"""
import DistRDF

return DistRDF.create_distributed_module(parent)


def _subimport(name):
# type: (str) -> types.ModuleType
"""
Expand Down Expand Up @@ -358,15 +341,33 @@ def MakePandasDataFrame(df):
ns.FromPandas = MakePandasDataFrame

try:
# Inject Experimental.Distributed package into namespace RDF if available
ns.Experimental.Distributed = _create_rdf_experimental_distributed_module(ns.Experimental)
# Inject Pythonizations to interact between local and distributed RDF package
from ._pythonization._rdf_namespace import _create_distributed_module, _rungraphs, _variationsfor
ns.Experimental.Distributed = _create_distributed_module(ns.Experimental)
ns.RunGraphs = _rungraphs(ns.Experimental.Distributed.RunGraphs, ns.RunGraphs)
ns.Experimental.VariationsFor = _variationsfor(ns.Experimental.Distributed.VariationsFor, ns.Experimental.VariationsFor)
except ImportError:
pass
except:
raise Exception("Failed to pythonize the namespace RDF")
del type(self).RDF
return ns

@property
def RDataFrame(self):
"""
Dispatch between the local and distributed RDataFrame depending on
input arguments.
"""
local_rdf = self.__getattr__("RDataFrame")
try:
import DistRDF
from ._pythonization._rdf_namespace import _rdataframe
return _rdataframe(local_rdf, DistRDF.RDataFrame)
except ImportError:
return local_rdf


# Overload RooFit namespace
@property
def RooFit(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Author: Vincenzo Eduardo Padulano CERN 10/2024

################################################################################
# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. #
# All rights reserved. #
# #
# For the licensing terms see $ROOTSYS/LICENSE. #
# For the list of contributors see $ROOTSYS/README/CREDITS. #
################################################################################

"""
This module contains utilities to help in the organization of the RDataFrame
namespace and the interaction between the C++ and Python functionalities
"""


def _create_distributed_module(parent):
"""
Create the ROOT.RDF.Experimental.Distributed python module.
This module will be injected into the ROOT.RDF namespace.
Arguments:
parent: The ROOT.RDF namespace. Needed to define __package__.
Returns:
types.ModuleType: The ROOT.RDF.Experimental.Distributed submodule.
"""
import DistRDF

return DistRDF.create_distributed_module(parent)


def _rungraphs(distrdf_rungraphs, rdf_rungraphs):
"""
Create a callable that correctly dispatches either to the local or
distributed version of RunGraphs.
"""

def rungraphs(handles):
# Caveat: we should not call `hasattr` on the result pointer, since
# this will implicitly trigger the connected computation graph
if len(handles) > 0 and "DistRDF" in str(type(handles[0])):
return distrdf_rungraphs(handles)
else:
return rdf_rungraphs(handles)

return rungraphs


def _variationsfor(distrdf_variationsfor, rdf_variationsfor):
"""
Create a callable that correctly dispatches either to the local or
distributed version of VariationsFor.
"""

def variationsfor(resptr):
# Caveat: we should not call `hasattr` on the result pointer, since
# this will implicitly trigger the connected computation graph
if "DistRDF" in str(type(resptr)):
return distrdf_variationsfor(resptr)
else:
# Help local VariationsFor with the type of the value held by the result pointer
inner_type = type(resptr).__name__
inner_type = inner_type[
inner_type.index("<") + 1: inner_type.rindex(">")]
return rdf_variationsfor[inner_type](resptr)

return variationsfor


def _rdataframe(local_rdf, distributed_rdf):
"""
Create a callable that correctly dispatches either to the local or
distributed RDataFrame constructor, depending on whether the "executor"
keyword argument is absent or not.
"""

def rdataframe(*args, **kwargs):
if kwargs.get("executor", None) is not None:
return distributed_rdf(*args, **kwargs)
else:
return local_rdf(*args, **kwargs)

return rdataframe
Loading

0 comments on commit 0b8df8b

Please sign in to comment.