Skip to content

Commit

Permalink
Merge pull request #4995 from FederatedAI/feature-2.0-beta-spark
Browse files Browse the repository at this point in the history
fate on spark
  • Loading branch information
mgqa34 authored Jul 24, 2023
2 parents c9fb7f1 + 5509887 commit 4ed2830
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 223 deletions.
6 changes: 4 additions & 2 deletions python/fate/arch/computing/spark/_csession.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def __init__(self, session_id):
self._session_id = session_id

def load(self, uri: URI, schema, options: dict = None) -> "Table":
if not options:
options = {}
partitions = options.get("partitions", None)

if uri.scheme == "hdfs":
Expand All @@ -47,7 +49,7 @@ def load(self, uri: URI, schema, options: dict = None) -> "Table":
in_serialized=in_serialized,
id_delimiter=id_delimiter,
)
table.scheme = schema
table.schema = schema
return table

if uri.scheme == "hive":
Expand All @@ -61,7 +63,7 @@ def load(self, uri: URI, schema, options: dict = None) -> "Table":
db_name=database_name,
partitions=partitions,
)
table.scheme = schema
table.schema = schema
return table

if uri.scheme == "file":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ def write(self, df: "DataFrame", name=None, namespace=None):
class DataframeReader(_ArtifactTypeReader):
def read(self) -> "DataFrame":
logger.debug(f"start reading dataframe from artifact: {self.artifact}")
if self.artifact.uri.scheme == "file":
import inspect

from fate.arch import dataframe

kwargs = {}
p = inspect.signature(dataframe.CSVReader.__init__).parameters
parameter_keys = p.keys()
for k, v in self.artifact.metadata.metadata.items():
if k in parameter_keys:
kwargs[k] = v

return dataframe.CSVReader(**kwargs).to_frame(self.ctx, self.artifact.uri.path)
# if self.artifact.uri.scheme == "file":
# import inspect
#
# from fate.arch import dataframe
#
# kwargs = {}
# p = inspect.signature(dataframe.CSVReader.__init__).parameters
# parameter_keys = p.keys()
# for k, v in self.artifact.metadata.metadata.items():
# if k in parameter_keys:
# kwargs[k] = v
#
# return dataframe.CSVReader(**kwargs).to_frame(self.ctx, self.artifact.uri.path)

from fate.arch import dataframe

Expand Down
224 changes: 18 additions & 206 deletions python/fate/components/core/spec/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,221 +12,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import logging.config
import os
import pathlib
from typing import Literal
from typing import Optional

import pydantic


class PipelineLogger(pydantic.BaseModel):
class PipelineLoggerMetadata(pydantic.BaseModel):
basepath: pydantic.DirectoryPath
level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
debug_mode: bool = False

@pydantic.validator("basepath", pre=True)
def create_basepath(cls, value):
pathlib.Path(value).mkdir(parents=True, exist_ok=True)
return value

type: Literal["pipeline"]
metadata: PipelineLoggerMetadata
class LoggerConfig(pydantic.BaseModel):
config: Optional[dict] = None

def install(self):
self.metadata.basepath.mkdir(parents=True, exist_ok=True)
levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
formatters = {"brief": {"format": "'%(asctime)s %(levelname)-8s %(name)s:%(lineno)s %(message)s'"}}
handlers = {}
filters = {}

def add_file_handler(
name,
filename,
level,
formater="brief",
filters=[],
):
handlers[name] = {
"class": "logging.FileHandler",
"level": level,
"formatter": formater,
"filters": filters,
"filename": filename,
}

# add root logger
root_handlers = []
root_base_path = self.metadata.basepath.joinpath("root")
root_base_path.mkdir(parents=True, exist_ok=True)
for level in levels:
handler_name = f"root_{level.lower()}"
add_file_handler(
name=handler_name,
filename=root_base_path.joinpath(level),
level=level,
)
root_handlers.append(handler_name)

# add console logger
if self.metadata.debug_mode:
handler_name = f"root_console_{self.metadata.level.lower()}"
handlers[handler_name] = {
# "class": "logging.StreamHandler",
"class": "rich.logging.RichHandler",
# "formatter": "brief",
"level": self.metadata.level,
"filters": [],
# "stream": "ext://sys.stdout",
}
root_handlers.append(handler_name)

# add component_desc logger
# component_handlers = []
# component_base_path = self.metadata.basepath.joinpath("component_desc")
# component_base_path.mkdir(parents=True, exist_ok=True)
# filters["components"] = {"name": "fate.components"}
# filters["ml"] = {"name": "fate.ml"}
# for level in levels:
# handler_name = f"component_{level.lower()}"
# add_file_handler(
# name=handler_name,
# filename=component_base_path.joinpath(level),
# level=level,
# )
# component_handlers.append(handler_name)
# component_loggers = {
# "fate.components": dict(
# handlers=component_handlers,
# filters=["components"],
# level=self.metadata.level,
# ),
# "fate.ml": dict(
# handlers=component_handlers,
# filters=["ml"],
# level=self.metadata.level,
# ),
# }

logging.config.dictConfig(
dict(
if self.config is None:
handler_name = "rich_handler"
self.config = dict(
version=1,
formatters=formatters,
handlers=handlers,
filters=filters,
# loggers=component_loggers,
formatters={},
handlers={
handler_name: {
"class": "rich.logging.RichHandler",
"level": "DEBUG",
"filters": [],
}
},
filters={},
loggers={},
root=dict(handlers=root_handlers, level=self.metadata.level),
disable_existing_loggers=False,
)
)


class FlowLogger(pydantic.BaseModel):
class FlowLoggerMetadata(pydantic.BaseModel):
basepath: pydantic.DirectoryPath
level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]

@pydantic.validator("basepath", pre=True)
def create_basepath(cls, value):
pathlib.Path(value).mkdir(parents=True, exist_ok=True)
return value

type: Literal["flow"]
metadata: FlowLoggerMetadata

def install(self):
self.metadata.basepath.mkdir(parents=True, exist_ok=True)
levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
formatters = {"brief": {"format": "'%(asctime)s %(levelname)-8s %(name)s:%(lineno)s %(message)s'"}}
handlers = {}
filters = {}

def add_file_handler(name, filename, level, formater="brief", filters=[]):
handlers[name] = {
"class": "logging.FileHandler",
"level": level,
"formatter": formater,
"filters": filters,
"filename": filename,
}

# add root logger
root_handlers = []
root_base_path = self.metadata.basepath.joinpath("root")
root_base_path.mkdir(parents=True, exist_ok=True)
for level in levels:
handler_name = f"root_{level.lower()}"
add_file_handler(
name=handler_name,
filename=root_base_path.joinpath(level),
level=level,
)
root_handlers.append(handler_name)

# add console logger when debug_mode is True
if os.environ.get("COMPONENT_DEBUG_MODE", "false").lower() == "true":
handler_name = f"root_console_{self.metadata.level.lower()}"
handlers[handler_name] = {
# "class": "logging.StreamHandler",
"class": "rich.logging.RichHandler",
# "formatter": "brief",
"level": self.metadata.level,
"filters": [],
# "stream": "ext://sys.stdout",
}
root_handlers.append(handler_name)

# add component_desc logger
# component_handlers = []
# component_base_path = self.metadata.basepath.joinpath("component_desc")
# component_base_path.mkdir(parents=True, exist_ok=True)
# filters["components"] = {"name": "fate.components"}
# filters["ml"] = {"name": "fate.ml"}
# for level in levels:
# handler_name = f"component_{level.lower()}"
# add_file_handler(
# name=handler_name,
# filename=component_base_path.joinpath(level),
# level=level,
# )
# component_handlers.append(handler_name)
loggers = {}
# component_loggers = {
# "fate.components": dict(
# handlers=component_handlers,
# filters=["components"],
# level=self.metadata.level,
# ),
# "fate.ml": dict(
# handlers=component_handlers,
# filters=["ml"],
# level=self.metadata.level,
# ),
# }
# loggers.update(component_loggers)
#
logging.config.dictConfig(
dict(
version=1,
formatters=formatters,
handlers=handlers,
filters=filters,
loggers=loggers,
root=dict(handlers=root_handlers, level=self.metadata.level),
root=dict(handlers=[handler_name], level="DEBUG"),
disable_existing_loggers=False,
)
)


class CustomLogger(pydantic.BaseModel):
class CustomLoggerMetadata(pydantic.BaseModel):
config_dict: dict

type: Literal["custom"]
metadata: CustomLoggerMetadata

def install(self):
logging.config.dictConfig(self.metadata.config_dict)
logging.config.dictConfig(self.config)
4 changes: 2 additions & 2 deletions python/fate/components/core/spec/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
RollSiteFederationSpec,
StandaloneFederationSpec,
)
from .logger import CustomLogger, FlowLogger, PipelineLogger
from .logger import LoggerConfig


class TaskConfigSpec(pydantic.BaseModel):
Expand All @@ -41,7 +41,7 @@ class TaskConfSpec(pydantic.BaseModel):
PulsarFederationSpec,
OSXFederationSpec,
]
logger: Union[PipelineLogger, FlowLogger, CustomLogger]
logger: LoggerConfig
task_final_meta_path: pydantic.FilePath = pydantic.Field(default_factory=lambda: os.path.abspath(os.getcwd()))

task_id: str
Expand Down

0 comments on commit 4ed2830

Please sign in to comment.