Skip to content

Commit

Permalink
Add data split mode to DMatrix MetaInfo (#8568)
Browse files Browse the repository at this point in the history
  • Loading branch information
rongou authored Dec 25, 2022
1 parent 77b069c commit 3ceeb8c
Show file tree
Hide file tree
Showing 20 changed files with 113 additions and 103 deletions.
1 change: 0 additions & 1 deletion doc/tutorials/saving_model.rst
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ Will print out something similar to (not actual output as it's too long for demo
"learner_train_param": {
"booster": "gbtree",
"disable_default_eval_metric": "0",
"dsplit": "auto",
"objective": "reg:squarederror"
},
"metrics": [],
Expand Down
16 changes: 16 additions & 0 deletions include/xgboost/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,28 @@ XGB_DLL int XGBGetGlobalConfig(char const **out_config);

/*!
* \brief load a data matrix
* \deprecated since 2.0.0
* \see XGDMatrixCreateFromURI()
* \param fname the name of the file
* \param silent whether print messages during loading
* \param out a loaded data matrix
* \return 0 when success, -1 when failure happens
*/
XGB_DLL int XGDMatrixCreateFromFile(const char *fname, int silent, DMatrixHandle *out);

/*!
* \brief load a data matrix
* \param config JSON encoded parameters for DMatrix construction. Accepted fields are:
* - uri: The URI of the input file.
* - silent (optional): Whether to print message during loading. Default to true.
* - data_split_mode (optional): Whether to split by row or column. In distributed mode, the
* file is split accordingly; otherwise this is only an indicator on how the file was split
* beforehand. Default to row.
* \param out a loaded data matrix
* \return 0 when success, -1 when failure happens
*/
XGB_DLL int XGDMatrixCreateFromURI(char const *config, DMatrixHandle *out);

/**
* @example c-api-demo.c
*/
Expand Down
13 changes: 7 additions & 6 deletions include/xgboost/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ enum class DataType : uint8_t {

enum class FeatureType : uint8_t { kNumerical = 0, kCategorical = 1 };

enum class DataSplitMode : int {
kAuto = 0, kCol = 1, kRow = 2, kNone = 3
};
enum class DataSplitMode : int { kRow = 0, kCol = 1 };

/*!
* \brief Meta information about dataset, always sit in memory.
Expand All @@ -60,6 +58,8 @@ class MetaInfo {
uint64_t num_nonzero_{0}; // NOLINT
/*! \brief label of each instance */
linalg::Tensor<float, 2> labels;
/*! \brief data split mode */
DataSplitMode data_split_mode{DataSplitMode::kRow};
/*!
* \brief the index of begin and end of a group
* needed when the learning task is ranking.
Expand Down Expand Up @@ -544,15 +544,16 @@ class DMatrix {
* \brief Load DMatrix from URI.
* \param uri The URI of input.
* \param silent Whether print information during loading.
* \param data_split_mode Mode to read in part of the data, divided among the workers in distributed mode.
* \param data_split_mode In distributed mode, split the input according this mode; otherwise,
* it's just an indicator on how the input was split beforehand.
* \param file_format The format type of the file, used for dmlc::Parser::Create.
* By default "auto" will be able to load in both local binary file.
* \param page_size Page size for external memory.
* \return The created DMatrix.
*/
static DMatrix* Load(const std::string& uri,
bool silent,
DataSplitMode data_split_mode,
bool silent = true,
DataSplitMode data_split_mode = DataSplitMode::kRow,
const std::string& file_format = "auto");

/**
Expand Down
11 changes: 11 additions & 0 deletions python-package/xgboost/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import warnings
from abc import ABC, abstractmethod
from collections.abc import Mapping
from enum import IntEnum, unique
from functools import wraps
from inspect import Parameter, signature
from typing import (
Expand Down Expand Up @@ -608,6 +609,13 @@ def inner_f(*args: Any, **kwargs: Any) -> _T:
_deprecate_positional_args = require_keyword_args(False)


@unique
class DataSplitMode(IntEnum):
"""Supported data split mode for DMatrix."""
ROW = 0
COL = 1


class DMatrix: # pylint: disable=too-many-instance-attributes,too-many-public-methods
"""Data Matrix used in XGBoost.
Expand Down Expand Up @@ -635,6 +643,7 @@ def __init__(
label_upper_bound: Optional[ArrayLike] = None,
feature_weights: Optional[ArrayLike] = None,
enable_categorical: bool = False,
data_split_mode: DataSplitMode = DataSplitMode.ROW,
) -> None:
"""Parameters
----------
Expand Down Expand Up @@ -728,6 +737,7 @@ def __init__(
feature_names=feature_names,
feature_types=feature_types,
enable_categorical=enable_categorical,
data_split_mode=data_split_mode,
)
assert handle is not None
self.handle = handle
Expand Down Expand Up @@ -1332,6 +1342,7 @@ def __init__( # pylint: disable=super-init-not-called
label_upper_bound: Optional[ArrayLike] = None,
feature_weights: Optional[ArrayLike] = None,
enable_categorical: bool = False,
data_split_mode: DataSplitMode = DataSplitMode.ROW,
) -> None:
self.max_bin: int = max_bin if max_bin is not None else 256
self.missing = missing if missing is not None else np.nan
Expand Down
14 changes: 10 additions & 4 deletions python-package/xgboost/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .core import (
_LIB,
DataIter,
DataSplitMode,
DMatrix,
_check_call,
_cuda_array_interface,
Expand Down Expand Up @@ -865,13 +866,17 @@ def _from_uri(
missing: Optional[FloatCompatible],
feature_names: Optional[FeatureNames],
feature_types: Optional[FeatureTypes],
data_split_mode: DataSplitMode = DataSplitMode.ROW,
) -> DispatchedDataBackendReturnType:
_warn_unused_missing(data, missing)
handle = ctypes.c_void_p()
data = os.fspath(os.path.expanduser(data))
_check_call(_LIB.XGDMatrixCreateFromFile(c_str(data),
ctypes.c_int(1),
ctypes.byref(handle)))
args = {
"uri": str(data),
"data_split_mode": int(data_split_mode),
}
config = bytes(json.dumps(args), "utf-8")
_check_call(_LIB.XGDMatrixCreateFromURI(config, ctypes.byref(handle)))
return handle, feature_names, feature_types


Expand Down Expand Up @@ -938,6 +943,7 @@ def dispatch_data_backend(
feature_names: Optional[FeatureNames],
feature_types: Optional[FeatureTypes],
enable_categorical: bool = False,
data_split_mode: DataSplitMode = DataSplitMode.ROW,
) -> DispatchedDataBackendReturnType:
'''Dispatch data for DMatrix.'''
if not _is_cudf_ser(data) and not _is_pandas_series(data):
Expand All @@ -953,7 +959,7 @@ def dispatch_data_backend(
if _is_numpy_array(data):
return _from_numpy_array(data, missing, threads, feature_names, feature_types)
if _is_uri(data):
return _from_uri(data, missing, feature_names, feature_types)
return _from_uri(data, missing, feature_names, feature_types, data_split_mode)
if _is_list(data):
return _from_list(data, missing, threads, feature_names, feature_types)
if _is_tuple(data):
Expand Down
30 changes: 21 additions & 9 deletions src/c_api/c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,29 @@ XGB_DLL int XGBGetGlobalConfig(const char** json_str) {
}

XGB_DLL int XGDMatrixCreateFromFile(const char *fname, int silent, DMatrixHandle *out) {
API_BEGIN();
auto data_split_mode = DataSplitMode::kNone;
if (collective::IsFederated()) {
LOG(CONSOLE) << "XGBoost federated mode detected, not splitting data among workers";
} else if (collective::IsDistributed()) {
LOG(CONSOLE) << "XGBoost distributed mode detected, will split data among workers";
data_split_mode = DataSplitMode::kRow;
}
xgboost_CHECK_C_ARG_PTR(fname);
xgboost_CHECK_C_ARG_PTR(out);
*out = new std::shared_ptr<DMatrix>(DMatrix::Load(fname, silent != 0, data_split_mode));

Json config{Object()};
config["uri"] = std::string{fname};
config["silent"] = silent;
std::string config_str;
Json::Dump(config, &config_str);
return XGDMatrixCreateFromURI(config_str.c_str(), out);
}

XGB_DLL int XGDMatrixCreateFromURI(const char *config, DMatrixHandle *out) {
API_BEGIN();
xgboost_CHECK_C_ARG_PTR(config);
xgboost_CHECK_C_ARG_PTR(out);

auto jconfig = Json::Load(StringView{config});
std::string uri = RequiredArg<String>(jconfig, "uri", __func__);
auto silent = static_cast<bool>(OptionalArg<Integer, int64_t>(jconfig, "silent", 1));
auto data_split_mode =
static_cast<DataSplitMode>(OptionalArg<Integer, int64_t>(jconfig, "data_split_mode", 0));

*out = new std::shared_ptr<DMatrix>(DMatrix::Load(uri, silent, data_split_mode));
API_END();
}

Expand Down
13 changes: 1 addition & 12 deletions src/cli_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,8 @@ struct CLIParam : public XGBoostParameter<CLIParam> {
DMLC_DECLARE_FIELD(name_pred).set_default("pred.txt")
.describe("Name of the prediction file.");
DMLC_DECLARE_FIELD(dsplit).set_default(0)
.add_enum("auto", 0)
.add_enum("row", 0)
.add_enum("col", 1)
.add_enum("row", 2)
.add_enum("none", 3)
.describe("Data split mode.");
DMLC_DECLARE_FIELD(ntree_limit).set_default(0).set_lower_bound(0)
.describe("(Deprecated) Use iteration_begin/iteration_end instead.");
Expand Down Expand Up @@ -158,15 +156,6 @@ struct CLIParam : public XGBoostParameter<CLIParam> {
if (name_pred == "stdout") {
save_period = 0;
}
if (dsplit == static_cast<int>(DataSplitMode::kAuto)) {
if (collective::IsFederated()) {
dsplit = static_cast<int>(DataSplitMode::kNone);
} else if (collective::IsDistributed()) {
dsplit = static_cast<int>(DataSplitMode::kRow);
} else {
dsplit = static_cast<int>(DataSplitMode::kNone);
}
}
}
};

Expand Down
19 changes: 12 additions & 7 deletions src/data/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -783,18 +783,22 @@ DMatrix *TryLoadBinary(std::string fname, bool silent) {

DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_split_mode,
const std::string& file_format) {
CHECK(data_split_mode == DataSplitMode::kRow ||
data_split_mode == DataSplitMode::kCol ||
data_split_mode == DataSplitMode::kNone)
<< "Precondition violated; data split mode can only be 'row', 'col', or 'none'";
auto need_split = false;
if (collective::IsFederated()) {
LOG(CONSOLE) << "XGBoost federated mode detected, not splitting data among workers";
} else if (collective::IsDistributed()) {
LOG(CONSOLE) << "XGBoost distributed mode detected, will split data among workers";
need_split = true;
}

std::string fname, cache_file;
size_t dlm_pos = uri.find('#');
if (dlm_pos != std::string::npos) {
cache_file = uri.substr(dlm_pos + 1, uri.length());
fname = uri.substr(0, dlm_pos);
CHECK_EQ(cache_file.find('#'), std::string::npos)
<< "Only one `#` is allowed in file path for cache file specification.";
if (data_split_mode == DataSplitMode::kRow) {
if (need_split && data_split_mode == DataSplitMode::kRow) {
std::ostringstream os;
std::vector<std::string> cache_shards = common::Split(cache_file, ':');
for (size_t i = 0; i < cache_shards.size(); ++i) {
Expand Down Expand Up @@ -828,7 +832,7 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
}

int partid = 0, npart = 1;
if (data_split_mode == DataSplitMode::kRow) {
if (need_split && data_split_mode == DataSplitMode::kRow) {
partid = collective::GetRank();
npart = collective::GetWorldSize();
} else {
Expand Down Expand Up @@ -887,7 +891,7 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
* since partitioned data not knowing the real number of features. */
collective::Allreduce<collective::Operation::kMax>(&dmat->Info().num_col_, 1);

if (data_split_mode == DataSplitMode::kCol) {
if (need_split && data_split_mode == DataSplitMode::kCol) {
if (!cache_file.empty()) {
LOG(FATAL) << "Column-wise data split is not support for external memory.";
}
Expand All @@ -898,6 +902,7 @@ DMatrix* DMatrix::Load(const std::string& uri, bool silent, DataSplitMode data_s
delete dmat;
return sliced;
} else {
dmat->Info().data_split_mode = data_split_mode;
return dmat;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/data/simple_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ DMatrix* SimpleDMatrix::SliceCol(std::size_t start, std::size_t size) {
out->Info() = this->Info().Copy();
out->Info().num_nonzero_ = h_offset.back();
}
out->Info().data_split_mode = DataSplitMode::kCol;
return out;
}

Expand Down
Loading

0 comments on commit 3ceeb8c

Please sign in to comment.