Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[df] Unify local and distributed API #16681

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
# @date 2021-11

################################################################################
# Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. #
# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. #
# All rights reserved. #
# #
# For the licensing terms see $ROOTSYS/LICENSE. #
# For the list of contributors see $ROOTSYS/README/CREDITS. #
################################################################################
from __future__ import annotations
import warnings


def RDataFrame(*args, **kwargs):
"""
Expand All @@ -18,6 +20,29 @@ def RDataFrame(*args, **kwargs):

from DistRDF.Backends.Dask import Backend
daskclient = kwargs.get("daskclient", None)
daskbackend = Backend.DaskBackend(daskclient=daskclient)
executor = kwargs.get("executor", None)
msg_warn = (
"The keyword argument 'daskclient' is not necessary anymore and will "
"be removed in a future release. Use 'executor' instead to provide the "
"distributed.Client object."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last sentence I would just change order to make it more clear:
"To provide the distributed.Client object, use 'executor' instead".

)
msg_err = (
"Both the 'daskclient' and 'executor' keyword arguments were provided. "
"This is not supported. Please provide only the 'executor' argument."
)

if daskclient is not None:
warnings.warn(msg_warn, FutureWarning)
if executor is not None:
raise ValueError(msg_err)
else:
executor = daskclient
daskclient = None

if executor is not None and daskclient is not None:
warnings.warn(msg_warn, FutureWarning)
raise ValueError(msg_err)

daskbackend = Backend.DaskBackend(daskclient=executor)

return daskbackend.make_dataframe(*args, **kwargs)
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
## @author Vincenzo Eduardo Padulano
# @author Vincenzo Eduardo Padulano
# @author Enric Tejedor
# @date 2021-02

################################################################################
# Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. #
# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. #
# All rights reserved. #
# #
# For the licensing terms see $ROOTSYS/LICENSE. #
# For the list of contributors see $ROOTSYS/README/CREDITS. #
################################################################################
from __future__ import annotations
import warnings


def RDataFrame(*args, **kwargs):
"""
Expand All @@ -18,6 +20,29 @@ def RDataFrame(*args, **kwargs):

from DistRDF.Backends.Spark import Backend
sparkcontext = kwargs.get("sparkcontext", None)
spark = Backend.SparkBackend(sparkcontext=sparkcontext)
executor = kwargs.get("executor", None)
msg_warn = (
"The keyword argument 'sparkcontext' is not necessary anymore and will "
"be removed in a future release. Use 'executor' instead to provide the "
"SparkContext object."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To provide the SparkContext object, use 'executor' instead.

msg_err = (
"Both the 'sparkcontext' and 'executor' keyword arguments were provided. "
"This is not supported. Please provide only the 'executor' argument."
)

if sparkcontext is not None:
warnings.warn(msg_warn, FutureWarning)
if executor is not None:
raise ValueError(msg_err)
else:
executor = sparkcontext
sparkcontext = None

if executor is not None and sparkcontext is not None:
warnings.warn(msg_warn, FutureWarning)
raise ValueError(msg_err)

spark = Backend.SparkBackend(sparkcontext=executor)

return spark.make_dataframe(*args, **kwargs)
38 changes: 33 additions & 5 deletions bindings/experimental/distrdf/python/DistRDF/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# @date 2021-02

################################################################################
# Copyright (C) 1995-2022, Rene Brun and Fons Rademakers. #
# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. #
# All rights reserved. #
# #
# For the licensing terms see $ROOTSYS/LICENSE. #
Expand Down Expand Up @@ -68,19 +68,17 @@ def RunGraphs(proxies: Iterable) -> int:

@code{.py}
import ROOT
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
RunGraphs = ROOT.RDF.Experimental.Distributed.RunGraphs

# Create 3 different dataframes and book an histogram on each one
histoproxies = [
RDataFrame(100)
ROOT.RDataFrame(100, executor=SupportedExecutor(...))
.Define("x", "rdfentry_")
.Histo1D(("name", "title", 10, 0, 100), "x")
for _ in range(4)
]

# Execute the 3 computation graphs
n_graphs_run = RunGraphs(histoproxies)
n_graphs_run = ROOT.RDF.RunGraphs(histoproxies)
# Retrieve all the histograms in one go
histos = [histoproxy.GetValue() for histoproxy in histoproxies]
@endcode
Expand Down Expand Up @@ -138,3 +136,33 @@ def create_distributed_module(parentmodule):
distributed.LiveVisualize = LiveVisualize

return distributed


def RDataFrame(*args, **kwargs):
executor = kwargs.get("executor", None)
if executor is None:
raise ValueError(
"Missing keyword argument 'client'. Please provide a connection object "
"to one of the schedulers supported by distributed RDataFrame."
)

# Try to dispatch to the correct distributed scheduler implementation
try:
from distributed import Client
from DistRDF.Backends.Dask import RDataFrame
if isinstance(executor, Client):
return RDataFrame(*args, **kwargs)
except ImportError:
pass

try:
from pyspark import SparkContext
from DistRDF.Backends.Spark import RDataFrame
if isinstance(executor, SparkContext):
return RDataFrame(*args, **kwargs)
except ImportError:
pass

raise TypeError(
f"The client object of type '{type(executor)}' is not a supported "
"connection type for distributed RDataFrame.")
3 changes: 2 additions & 1 deletion bindings/pyroot/pythonizations/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ if(dataframe)
ROOT/_pythonization/_rdataframe.py
ROOT/_pythonization/_rdfdescription.py
ROOT/_pythonization/_rdf_conversion_maps.py
ROOT/_pythonization/_rdf_pyz.py)
ROOT/_pythonization/_rdf_pyz.py
ROOT/_pythonization/_rdf_namespace.py)
endif()

if(roofit)
Expand Down
39 changes: 20 additions & 19 deletions bindings/pyroot/pythonizations/python/ROOT/_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,6 @@ def __setattr__(self, name, value):
return setattr(self._gROOT, name, value)


def _create_rdf_experimental_distributed_module(parent):
"""
Create the ROOT.RDF.Experimental.Distributed python module.

This module will be injected into the ROOT.RDF namespace.

Arguments:
parent: The ROOT.RDF namespace. Needed to define __package__.

Returns:
types.ModuleType: The ROOT.RDF.Experimental.Distributed submodule.
"""
import DistRDF

return DistRDF.create_distributed_module(parent)


def _subimport(name):
# type: (str) -> types.ModuleType
"""
Expand Down Expand Up @@ -358,15 +341,33 @@ def MakePandasDataFrame(df):
ns.FromPandas = MakePandasDataFrame

try:
# Inject Experimental.Distributed package into namespace RDF if available
ns.Experimental.Distributed = _create_rdf_experimental_distributed_module(ns.Experimental)
# Inject Pythonizations to interact between local and distributed RDF package
from ._pythonization._rdf_namespace import _create_distributed_module, _rungraphs, _variationsfor
ns.Experimental.Distributed = _create_distributed_module(ns.Experimental)
ns.RunGraphs = _rungraphs(ns.Experimental.Distributed.RunGraphs, ns.RunGraphs)
ns.Experimental.VariationsFor = _variationsfor(ns.Experimental.Distributed.VariationsFor, ns.Experimental.VariationsFor)
except ImportError:
pass
except:
raise Exception("Failed to pythonize the namespace RDF")
del type(self).RDF
return ns

@property
def RDataFrame(self):
"""
Dispatch between the local and distributed RDataFrame depending on
input arguments.
"""
local_rdf = self.__getattr__("RDataFrame")
try:
import DistRDF
from ._pythonization._rdf_namespace import _rdataframe
return _rdataframe(local_rdf, DistRDF.RDataFrame)
except ImportError:
return local_rdf


# Overload RooFit namespace
@property
def RooFit(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Author: Vincenzo Eduardo Padulano CERN 10/2024

################################################################################
# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. #
# All rights reserved. #
# #
# For the licensing terms see $ROOTSYS/LICENSE. #
# For the list of contributors see $ROOTSYS/README/CREDITS. #
################################################################################

"""
This module contains utilities to help in the organization of the RDataFrame
namespace and the interaction between the C++ and Python functionalities
"""


def _create_distributed_module(parent):
"""
Create the ROOT.RDF.Experimental.Distributed python module.

This module will be injected into the ROOT.RDF namespace.

Arguments:
parent: The ROOT.RDF namespace. Needed to define __package__.

Returns:
types.ModuleType: The ROOT.RDF.Experimental.Distributed submodule.
"""
import DistRDF

return DistRDF.create_distributed_module(parent)


def _rungraphs(distrdf_rungraphs, rdf_rungraphs):
"""
Create a callable that correctly dispatches either to the local or
distributed version of RunGraphs.
"""

def rungraphs(handles):
# Caveat: we should not call `hasattr` on the result pointer, since
# this will implicitly trigger the connected computation graph
if len(handles) > 0 and "DistRDF" in str(type(handles[0])):
return distrdf_rungraphs(handles)
else:
return rdf_rungraphs(handles)

return rungraphs


def _variationsfor(distrdf_variationsfor, rdf_variationsfor):
"""
Create a callable that correctly dispatches either to the local or
distributed version of VariationsFor.
"""

def variationsfor(resptr):
# Caveat: we should not call `hasattr` on the result pointer, since
# this will implicitly trigger the connected computation graph
if "DistRDF" in str(type(resptr)):
return distrdf_variationsfor(resptr)
else:
# Help local VariationsFor with the type of the value held by the result pointer
inner_type = type(resptr).__name__
inner_type = inner_type[
inner_type.index("<") + 1: inner_type.rindex(">")]
return rdf_variationsfor[inner_type](resptr)

return variationsfor


def _rdataframe(local_rdf, distributed_rdf):
"""
Create a callable that correctly dispatches either to the local or
distributed RDataFrame constructor, depending on whether the "executor"
keyword argument is absent or not.
"""

def rdataframe(*args, **kwargs):
if kwargs.get("executor", None) is not None:
return distributed_rdf(*args, **kwargs)
else:
return local_rdf(*args, **kwargs)

return rdataframe
Loading
Loading