From 41a8f5646047e1fb9f460930184e2731d0255fc4 Mon Sep 17 00:00:00 2001 From: Richard Stotz Date: Fri, 25 Oct 2024 09:42:46 -0700 Subject: [PATCH] [TF-DF] [YDF] Remove old Tensorflow Status compatibility PiperOrigin-RevId: 689818683 --- CHANGELOG.md | 6 + tensorflow_decision_forests/keras/BUILD | 1 - .../keras/keras_distributed_test.py | 2 +- .../keras/keras_test.py | 7 +- .../tensorflow/ops/inference/BUILD | 7 +- .../tensorflow/ops/inference/kernel.cc | 276 ++++++++---------- .../tensorflow/ops/inference/op.cc | 45 +-- .../tensorflow/ops/training/BUILD | 7 +- .../ops/training/feature_on_file.cc | 35 ++- .../tensorflow/ops/training/feature_on_file.h | 36 +-- .../tensorflow/ops/training/features.h | 62 ++-- .../tensorflow/ops/training/kernel.cc | 186 ++++++------ .../tensorflow/ops/training/kernel.h | 18 +- .../ops/training/kernel_grpc_worker.cc | 9 +- .../ops/training/kernel_long_process.cc | 31 +- .../tensorflow/ops/training/kernel_on_file.cc | 60 ++-- 16 files changed, 405 insertions(+), 383 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92411abb..142b138b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## HEAD + +### Fix + +- Some errors are now InvalidArgumentError instead of UnknownError. + ## 1.10.0 - 2024-08-21 ### Fix diff --git a/tensorflow_decision_forests/keras/BUILD b/tensorflow_decision_forests/keras/BUILD index 2816c77f..ac25ed5c 100644 --- a/tensorflow_decision_forests/keras/BUILD +++ b/tensorflow_decision_forests/keras/BUILD @@ -215,7 +215,6 @@ py_test( "@pypi_pandas//:pkg", "@pypi_portpicker//:pkg", "@release_or_nightly//:tensorflow_pkg", - "@release_or_nightly//:tf_keras_pkg", # TensorFlow /distribute:distribute_lib, "//tensorflow_decision_forests", "@ydf//yggdrasil_decision_forests/learner/distributed_gradient_boosted_trees:dgbt_py_proto", diff --git a/tensorflow_decision_forests/keras/keras_distributed_test.py b/tensorflow_decision_forests/keras/keras_distributed_test.py index e123aaa1..24de8b86 100644 --- a/tensorflow_decision_forests/keras/keras_distributed_test.py +++ b/tensorflow_decision_forests/keras/keras_distributed_test.py @@ -706,7 +706,7 @@ def test_in_memory_not_supported(self): model = tfdf.keras.DistributedGradientBoostedTreesModel(worker_logs=False) with self.assertRaisesRegex( - tf.errors.UnknownError, + tf.errors.InvalidArgumentError, "does not support training from in-memory datasets", ): model.fit(dataset) diff --git a/tensorflow_decision_forests/keras/keras_test.py b/tensorflow_decision_forests/keras/keras_test.py index 4c24b748..601f7b8c 100644 --- a/tensorflow_decision_forests/keras/keras_test.py +++ b/tensorflow_decision_forests/keras/keras_test.py @@ -2923,7 +2923,8 @@ def test_node_format_blob_sequence(self): def test_check_parameters(self): with self.assertRaisesRegex( - ValueError, 'The parameter "num_trees" is smaller than the minimum' + tf.errors.InvalidArgumentError, + 'The parameter "num_trees" is smaller than the minimum', ): keras.RandomForestModel(num_trees=-10) @@ -3041,7 +3042,7 @@ def test_monotonic_non_compatible_learner(self): pd_dataset = pd.DataFrame({"f": [0, 1], "l": [0, 1]}) tf_dataset = keras.pd_dataframe_to_tf_dataset(pd_dataset, label="l") with self.assertRaisesRegex( - tf.errors.UnknownError, + tf.errors.InvalidArgumentError, "The learner CART does not support monotonic constraints", ): model.fit(tf_dataset) @@ -3053,7 +3054,7 @@ def test_monotonic_non_compatible_options(self): pd_dataset = pd.DataFrame({"f": [0, 1], "l": [0, 1]}) tf_dataset = keras.pd_dataframe_to_tf_dataset(pd_dataset, label="l") with self.assertRaisesRegex( - tf.errors.UnknownError, + tf.errors.InvalidArgumentError, "Gradient Boosted Trees does not support monotonic constraints with" " use_hessian_gain=false", ): diff --git a/tensorflow_decision_forests/tensorflow/ops/inference/BUILD b/tensorflow_decision_forests/tensorflow/ops/inference/BUILD index 36f7dd99..fbe41604 100644 --- a/tensorflow_decision_forests/tensorflow/ops/inference/BUILD +++ b/tensorflow_decision_forests/tensorflow/ops/inference/BUILD @@ -111,7 +111,11 @@ cc_library( name = "op", srcs = OP_SRCS, linkstatic = 1, - deps = OP_DEPS, + deps = OP_DEPS + [ + "@com_google_absl//absl/status", + "@com_google_absl//absl/status:statusor", + "@ydf//yggdrasil_decision_forests/utils:status_macros", + ], alwayslink = 1, ) @@ -133,7 +137,6 @@ cc_library( "@ydf//yggdrasil_decision_forests/utils:compatibility", "@ydf//yggdrasil_decision_forests/utils:distribution_cc_proto", "@ydf//yggdrasil_decision_forests/utils:status_macros", - "@ydf//yggdrasil_decision_forests/utils:tensorflow", ] + select({ "@ydf//yggdrasil_decision_forests:tensorflow_with_header_lib": [ "@release_or_nightly//:tensorflow_libtensorflow_framework", diff --git a/tensorflow_decision_forests/tensorflow/ops/inference/kernel.cc b/tensorflow_decision_forests/tensorflow/ops/inference/kernel.cc index 4bf9f276..daf4cc16 100644 --- a/tensorflow_decision_forests/tensorflow/ops/inference/kernel.cc +++ b/tensorflow_decision_forests/tensorflow/ops/inference/kernel.cc @@ -66,7 +66,6 @@ #include "yggdrasil_decision_forests/model/model_library.h" #include "yggdrasil_decision_forests/utils/distribution.pb.h" #include "yggdrasil_decision_forests/utils/status_macros.h" -#include "yggdrasil_decision_forests/utils/tensorflow.h" namespace tensorflow_decision_forests { namespace ops { @@ -189,36 +188,19 @@ struct OutputLeavesTensors { const int num_trees; }; -// Creates a failed tf::Status. -tf::Status TFStatus(absl::StatusCode code, const absl::string_view message) { - return tf::Status(static_cast(code), message); -} - -tf::Status TFStatusInvalidArgument(const absl::string_view message) { - return TFStatus(absl::StatusCode::kInvalidArgument, message); -} - -tf::Status TFStatusUnimplemented(const absl::string_view message) { - return TFStatus(absl::StatusCode::kUnimplemented, message); -} - -tf::Status TFStatusInternal(const absl::string_view message) { - return TFStatus(absl::StatusCode::kInternal, message); -} - // Converts the vector of item to bitmap representation of the output types. -tf::Status GetOutputTypesBitmap(const std::vector& src_types, - OutputTypesBitmap* dst_types) { +absl::Status GetOutputTypesBitmap(const std::vector& src_types, + OutputTypesBitmap* dst_types) { *dst_types = OutputTypesBitmap(); for (const auto& src_type : src_types) { if (src_type == kOutputTypeLeaves) { dst_types->leaves = true; } else { - return TFStatusInvalidArgument( + return absl::InvalidArgumentError( absl::StrCat("Unknown output types: ", src_type)); } } - return tf::OkStatus(); + return absl::OkStatus(); } // Mapping between feature idx (the index used by simpleML to index features), @@ -230,8 +212,8 @@ class GenericInferenceEngine; class FeatureIndex { public: - tf::Status Initialize(const std::vector& input_features, - const dataset::proto::DataSpecification& data_spec) { + absl::Status Initialize(const std::vector& input_features, + const dataset::proto::DataSpecification& data_spec) { numerical_features_.clear(); boolean_features_.clear(); categorical_int_features_.clear(); @@ -254,13 +236,13 @@ class FeatureIndex { categorical_set_int_features_.push_back(feature_idx); break; default: - return TFStatusUnimplemented(absl::Substitute( + return absl::UnimplementedError(absl::Substitute( "Non supported feature type \"$0\" for feature \"$1\".", dataset::proto::ColumnType_Name(feature_spec.type()), feature_spec.name())); } } - return tf::OkStatus(); + return absl::OkStatus(); } const std::vector& numerical_features() const { @@ -299,14 +281,15 @@ class FeatureIndex { // - tensor_col_idx: Column, in "inputs", containing the feature. // - max_value: Maximum value of the items. Items above or equal to this value // will be considered out-of-vocabulary. -tf::Status ExtractCategoricalSetInt(const InputTensors& inputs, - const FeatureIndex& feature_index, - const int tensor_col_idx, - const int max_value, const int example_idx, - std::vector* values) { +absl::Status ExtractCategoricalSetInt(const InputTensors& inputs, + const FeatureIndex& feature_index, + const int tensor_col_idx, + const int max_value, + const int example_idx, + std::vector* values) { if (inputs.categorical_set_int_features_row_splits_dim_2(example_idx) != example_idx * feature_index.categorical_set_int_features().size()) { - return TFStatusInternal("Unexpected features_row_splits_dim_2 size."); + return absl::InternalError("Unexpected features_row_splits_dim_2 size."); } const int d1_cell = @@ -314,7 +297,7 @@ tf::Status ExtractCategoricalSetInt(const InputTensors& inputs, tensor_col_idx; if (d1_cell + 1 >= inputs.categorical_set_int_features_row_splits_dim_1.size()) { - return TFStatusInternal("Unexpected features_row_splits_dim_1 size."); + return absl::InternalError("Unexpected features_row_splits_dim_1 size."); } const int begin_idx = @@ -334,7 +317,7 @@ tf::Status ExtractCategoricalSetInt(const InputTensors& inputs, } (*values)[item_idx] = value; } - return tf::OkStatus(); + return absl::OkStatus(); } // Wrapping around an inference engine able to run a model. @@ -360,17 +343,17 @@ class AbstractInferenceEngine { // Run the inference of the model and returns its output (e.g. probabilities, // logits, regression). The output tensors are already allocated. - virtual tf::Status RunInference(const InputTensors& inputs, - const FeatureIndex& feature_index, - OutputTensors* outputs, - AbstractCache* cache) const = 0; + virtual absl::Status RunInference(const InputTensors& inputs, + const FeatureIndex& feature_index, + OutputTensors* outputs, + AbstractCache* cache) const = 0; // Run the inference of the model and returns the index of the active leaves. // The output tensors are already allocated. - virtual tf::Status RunInferenceGetLeaves(const InputTensors& inputs, - const FeatureIndex& feature_index, - OutputLeavesTensors* outputs, - AbstractCache* cache) const = 0; + virtual absl::Status RunInferenceGetLeaves(const InputTensors& inputs, + const FeatureIndex& feature_index, + OutputLeavesTensors* outputs, + AbstractCache* cache) const = 0; }; // The generic engine uses the generic serving API @@ -397,16 +380,16 @@ class GenericInferenceEngine : public AbstractInferenceEngine { return cache; } - tf::Status RunInference(const InputTensors& inputs, - const FeatureIndex& feature_index, - OutputTensors* outputs, - AbstractCache* abstract_cache) const override { + absl::Status RunInference(const InputTensors& inputs, + const FeatureIndex& feature_index, + OutputTensors* outputs, + AbstractCache* abstract_cache) const override { // Update the vertical dataset with the input tensors. auto* cache = dynamic_cast(abstract_cache); if (cache == nullptr) { - return TFStatusInternal("Unexpected cache type."); + return absl::InternalError("Unexpected cache type."); } - TF_RETURN_IF_ERROR(SetVerticalDataset(inputs, feature_index, cache)); + RETURN_IF_ERROR(SetVerticalDataset(inputs, feature_index, cache)); // Run the model. model::proto::Prediction prediction; @@ -429,7 +412,7 @@ class GenericInferenceEngine : public AbstractInferenceEngine { if (outputs->output_dim == 1 && !output_is_proba) { // Output the logit of the positive class. if (pred.distribution().counts().size() != 3) { - return TFStatusInternal("Wrong \"distribution\" shape."); + return absl::InternalError("Wrong \"distribution\" shape."); } const float logit = prediction.classification().distribution().counts(2) / @@ -439,7 +422,7 @@ class GenericInferenceEngine : public AbstractInferenceEngine { // Output the logit or probabilities. if (outputs->dense_predictions.dimension(1) != pred.distribution().counts().size() - 1) { - return TFStatusInternal("Wrong \"distribution\" shape."); + return absl::InternalError("Wrong \"distribution\" shape."); } for (int class_idx = 0; class_idx < outputs->output_dim; class_idx++) { @@ -474,7 +457,7 @@ class GenericInferenceEngine : public AbstractInferenceEngine { const auto& pred = prediction.uplift(); if (outputs->dense_predictions.dimension(1) != pred.treatment_effect_size()) { - return TFStatusInternal("Wrong \"distribution\" shape."); + return absl::InternalError("Wrong \"distribution\" shape."); } for (int uplift_idx = 0; uplift_idx < outputs->output_dim; uplift_idx++) { @@ -484,26 +467,24 @@ class GenericInferenceEngine : public AbstractInferenceEngine { } break; default: - return tf::Status( - static_cast(absl::StatusCode::kUnimplemented), - absl::Substitute("Non supported task $0", - Task_Name(model_->task()))); + return absl::UnimplementedError(absl::Substitute( + "Non supported task $0", Task_Name(model_->task()))); } } - return tf::OkStatus(); + return absl::OkStatus(); } - tf::Status RunInferenceGetLeaves( + absl::Status RunInferenceGetLeaves( const InputTensors& inputs, const FeatureIndex& feature_index, OutputLeavesTensors* outputs, AbstractCache* abstract_cache) const override { // Update the vertical dataset with the input tensors. auto* cache = dynamic_cast(abstract_cache); if (cache == nullptr) { - return TFStatusInternal("Unexpected cache type."); + return absl::InternalError("Unexpected cache type."); } - TF_RETURN_IF_ERROR(SetVerticalDataset(inputs, feature_index, cache)); + RETURN_IF_ERROR(SetVerticalDataset(inputs, feature_index, cache)); // In practice, we want row/batch major, col/tree minor. // Experimentally, this seems to be the case even through the RowMajor bit @@ -516,25 +497,25 @@ class GenericInferenceEngine : public AbstractInferenceEngine { auto* df_model = dynamic_cast(model_.get()); if (df_model == nullptr) { - return TFStatusInvalidArgument("The model is not a decision forest"); + return absl::InvalidArgumentError("The model is not a decision forest"); } // Run the model. for (int example_idx = 0; example_idx < inputs.batch_size; example_idx++) { - TF_RETURN_IF_ERROR(utils::FromUtilStatus(df_model->PredictGetLeaves( + RETURN_IF_ERROR(df_model->PredictGetLeaves( cache->dataset_, example_idx, absl::MakeSpan( outputs->leaves.data() + example_idx * outputs->num_trees, - outputs->num_trees)))); + outputs->num_trees))); } - return tf::OkStatus(); + return absl::OkStatus(); } private: - tf::Status SetVerticalDataset(const InputTensors& inputs, - const FeatureIndex& feature_index, - Cache* cache) const { + absl::Status SetVerticalDataset(const InputTensors& inputs, + const FeatureIndex& feature_index, + Cache* cache) const { cache->dataset_.set_nrow(inputs.batch_size); // Numerical features. for (int col_idx = 0; col_idx < feature_index.numerical_features().size(); @@ -564,7 +545,7 @@ class GenericInferenceEngine : public AbstractInferenceEngine { dataset::NumericalToDiscretizedNumerical(col_spec, value); } } else { - return TFStatusInternal("Unexpected column type."); + return absl::InternalError("Unexpected column type."); } } @@ -575,7 +556,7 @@ class GenericInferenceEngine : public AbstractInferenceEngine { auto* col = cache->dataset_.MutableColumnWithCastOrNull< dataset::VerticalDataset::BooleanColumn>(feature_idx); if (col == nullptr) { - return TFStatusInternal("Unexpected column type."); + return absl::InternalError("Unexpected column type."); } col->Resize(inputs.batch_size); auto& dst = *col->mutable_values(); @@ -601,7 +582,7 @@ class GenericInferenceEngine : public AbstractInferenceEngine { auto* col = cache->dataset_.MutableColumnWithCastOrNull< dataset::VerticalDataset::CategoricalColumn>(feature_idx); if (col == nullptr) { - return TFStatusInternal("Unexpected column type."); + return absl::InternalError("Unexpected column type."); } col->Resize(inputs.batch_size); const int max_value = cache->dataset_.data_spec() @@ -636,7 +617,7 @@ class GenericInferenceEngine : public AbstractInferenceEngine { auto* col = cache->dataset_.MutableColumnWithCastOrNull< dataset::VerticalDataset::CategoricalSetColumn>(feature_idx); if (col == nullptr) { - return TFStatusInternal("Unexpected column type."); + return absl::InternalError("Unexpected column type."); } col->Resize(inputs.batch_size); @@ -647,12 +628,9 @@ class GenericInferenceEngine : public AbstractInferenceEngine { for (int example_idx = 0; example_idx < inputs.batch_size; example_idx++) { - const auto status = - ExtractCategoricalSetInt(inputs, feature_index, col_idx, max_value, - example_idx, &tmp_values); - if (!status.ok()) { - return status; - } + RETURN_IF_ERROR(ExtractCategoricalSetInt(inputs, feature_index, col_idx, + max_value, example_idx, + &tmp_values)); if (!tmp_values.empty() && tmp_values.front() < 0) { col->SetNA(example_idx); @@ -662,7 +640,7 @@ class GenericInferenceEngine : public AbstractInferenceEngine { } } - return tf::OkStatus(); + return absl::OkStatus(); } std::unique_ptr model_; @@ -711,9 +689,9 @@ class SemiFastGenericInferenceEngine : public AbstractInferenceEngine { return cache; } - tf::Status SetInputFeatures(const InputTensors& inputs, - const FeatureIndex& feature_index, - Cache* cache) const { + absl::Status SetInputFeatures(const InputTensors& inputs, + const FeatureIndex& feature_index, + Cache* cache) const { // Allocate a cache of examples. if (cache->num_examples_in_cache_ < inputs.batch_size) { cache->examples_ = engine_->AllocateExamples(inputs.batch_size); @@ -724,17 +702,17 @@ class SemiFastGenericInferenceEngine : public AbstractInferenceEngine { return SetExamples(inputs, feature_index, cache->examples_.get()); } - tf::Status RunInference(const InputTensors& inputs, - const FeatureIndex& feature_index, - OutputTensors* outputs, - AbstractCache* abstract_cache) const override { + absl::Status RunInference(const InputTensors& inputs, + const FeatureIndex& feature_index, + OutputTensors* outputs, + AbstractCache* abstract_cache) const override { // Update the vertical dataset with the input tensors. auto* cache = dynamic_cast(abstract_cache); if (cache == nullptr) { - return TFStatusInternal("Unexpected cache type."); + return absl::InternalError("Unexpected cache type."); } - TF_RETURN_IF_ERROR(SetInputFeatures(inputs, feature_index, cache)); + RETURN_IF_ERROR(SetInputFeatures(inputs, feature_index, cache)); // Run the model. engine_->Predict(*cache->examples_, inputs.batch_size, @@ -744,7 +722,7 @@ class SemiFastGenericInferenceEngine : public AbstractInferenceEngine { if (decompact_probability_) { DCHECK_EQ(outputs->output_dim, 2); if (engine_->NumPredictionDimension() != 1) { - return TFStatusInternal("Wrong NumPredictionDimension"); + return absl::InternalError("Wrong NumPredictionDimension"); } for (int example_idx = 0; example_idx < inputs.batch_size; example_idx++) { @@ -756,7 +734,7 @@ class SemiFastGenericInferenceEngine : public AbstractInferenceEngine { } else { if (engine_->NumPredictionDimension() != outputs->output_dim) { - return TFStatusInternal("Wrong NumPredictionDimension"); + return absl::InternalError("Wrong NumPredictionDimension"); } for (int example_idx = 0; example_idx < inputs.batch_size; example_idx++) { @@ -768,19 +746,19 @@ class SemiFastGenericInferenceEngine : public AbstractInferenceEngine { } } } - return tf::OkStatus(); + return absl::OkStatus(); } - tf::Status RunInferenceGetLeaves( + absl::Status RunInferenceGetLeaves( const InputTensors& inputs, const FeatureIndex& feature_index, OutputLeavesTensors* outputs, AbstractCache* abstract_cache) const override { // Update the vertical dataset with the input tensors. auto* cache = dynamic_cast(abstract_cache); if (cache == nullptr) { - return TFStatusInternal("Unexpected cache type."); + return absl::InternalError("Unexpected cache type."); } - TF_RETURN_IF_ERROR(SetInputFeatures(inputs, feature_index, cache)); + RETURN_IF_ERROR(SetInputFeatures(inputs, feature_index, cache)); static_assert( !(std::remove_pointerleaves)>::type::Options & @@ -788,11 +766,11 @@ class SemiFastGenericInferenceEngine : public AbstractInferenceEngine { "leaves should be row minor"); // Run the model. - TF_RETURN_IF_ERROR(utils::FromUtilStatus(engine_->GetLeaves( + RETURN_IF_ERROR(engine_->GetLeaves( *cache->examples_, inputs.batch_size, - absl::MakeSpan(outputs->leaves.data(), outputs->leaves.size())))); + absl::MakeSpan(outputs->leaves.data(), outputs->leaves.size()))); - return tf::OkStatus(); + return absl::OkStatus(); } private: @@ -876,9 +854,9 @@ class SemiFastGenericInferenceEngine : public AbstractInferenceEngine { // Copy the content of "inputs" into "examples". // "examples" is allocated with at least "inputs.batch_size" examples. - tf::Status SetExamples(const InputTensors& inputs, - const FeatureIndex& feature_index, - serving::AbstractExampleSet* examples) const { + absl::Status SetExamples(const InputTensors& inputs, + const FeatureIndex& feature_index, + serving::AbstractExampleSet* examples) const { const auto& features = engine_->features(); examples->FillMissing(engine_->features()); @@ -932,12 +910,9 @@ class SemiFastGenericInferenceEngine : public AbstractInferenceEngine { .number_of_unique_values(); for (int example_idx = 0; example_idx < inputs.batch_size; example_idx++) { - const auto status = - ExtractCategoricalSetInt(inputs, feature_index, feature.tensor_col, - max_value, example_idx, &tmp_values); - if (!status.ok()) { - return status; - } + RETURN_IF_ERROR(ExtractCategoricalSetInt(inputs, feature_index, + feature.tensor_col, max_value, + example_idx, &tmp_values)); if (!tmp_values.empty() && tmp_values.front() < 0) { examples->SetMissingCategoricalSet(example_idx, @@ -965,7 +940,7 @@ class SemiFastGenericInferenceEngine : public AbstractInferenceEngine { } } - return tf::OkStatus(); + return absl::OkStatus(); } // Inference engine. Contains the model data. @@ -1000,31 +975,31 @@ class YggdrasilModelResource : public tf::ResourceBase { std::string DebugString() const override { return "YggdrasilModelResource"; } // Loads the model from disk. - tf::Status LoadModelFromDisk(const absl::string_view model_path, - const std::string& file_prefix, - const OutputTypesBitmap& output_types = {}, - const bool allow_slow_inference = true) { + absl::Status LoadModelFromDisk(const absl::string_view model_path, + const std::string& file_prefix, + const OutputTypesBitmap& output_types = {}, + const bool allow_slow_inference = true) { std::unique_ptr model; - TF_RETURN_IF_ERROR(utils::FromUtilStatus( - LoadModel(model_path, &model, {/*.file_prefix=*/file_prefix}))); + RETURN_IF_ERROR( + LoadModel(model_path, &model, {/*.file_prefix=*/file_prefix})); task_ = model->task(); - TF_RETURN_IF_ERROR( + RETURN_IF_ERROR( feature_index_.Initialize(model->input_features(), model->data_spec())); - TF_RETURN_IF_ERROR(ComputeDenseColRepresentation(model.get())); + RETURN_IF_ERROR(ComputeDenseColRepresentation(model.get())); if (output_types.leaves) { auto* df_model = dynamic_cast(model.get()); if (df_model == nullptr) { - return TFStatusInvalidArgument("The model is not a decision forest"); + return absl::InvalidArgumentError("The model is not a decision forest"); } num_trees_ = df_model->num_trees(); } // WARNING: After this function, the "model" might not be available anymore. - TF_RETURN_IF_ERROR(CreateInferenceEngine(output_types, allow_slow_inference, - std::move(model))); - return tf::OkStatus(); + RETURN_IF_ERROR(CreateInferenceEngine(output_types, allow_slow_inference, + std::move(model))); + return absl::OkStatus(); } const AbstractInferenceEngine* engine() const { @@ -1044,7 +1019,7 @@ class YggdrasilModelResource : public tf::ResourceBase { private: // Creates an inference engine compatible with the model. The inference engine // can take ownership of the abstract model data. - tf::Status CreateInferenceEngine( + absl::Status CreateInferenceEngine( const OutputTypesBitmap& output_types, const bool allow_slow_inference, std::unique_ptr model) { // Currently, none of the fast engines support leaves output. @@ -1055,16 +1030,14 @@ class YggdrasilModelResource : public tf::ResourceBase { auto inference_engine_or_status = SemiFastGenericInferenceEngine::Create( std::move(semi_fast_engine.value()), *model, feature_index()); - TF_RETURN_IF_ERROR( - utils::FromUtilStatus(inference_engine_or_status.status())); + RETURN_IF_ERROR(inference_engine_or_status.status()); inference_engine_ = std::move(inference_engine_or_status.value()); LOG(INFO) << "Use fast generic engine"; - return tf::OkStatus(); + return absl::OkStatus(); } if (!allow_slow_inference) { - return ::tensorflow::Status( - static_cast(absl::StatusCode::kUnknown), + return absl::UnknownError( "No compatible fast inference engine found for the model. Options: " "1) Make sure this binary is compiled with support with compatible " "fast inference engines. 2) Allow for the model to run with the " @@ -1079,12 +1052,12 @@ class YggdrasilModelResource : public tf::ResourceBase { LOG(INFO) << "Use slow generic engine"; inference_engine_ = absl::make_unique(std::move(model)); - return tf::OkStatus(); + return absl::OkStatus(); } // Pre-compute the values returned in the "dense_col_representation" output of // the inference OPs. - tf::Status ComputeDenseColRepresentation( + absl::Status ComputeDenseColRepresentation( const model::AbstractModel* const model) { if (task_ == Task::CLASSIFICATION) { const auto& label_spec = @@ -1108,7 +1081,7 @@ class YggdrasilModelResource : public tf::ResourceBase { } else { dense_col_representation_.resize(1); } - return tf::OkStatus(); + return absl::OkStatus(); } // The engine responsible to run the model. @@ -1131,19 +1104,19 @@ class YggdrasilModelResource : public tf::ResourceBase { // Gets an existing model resource from a resource handle. Increases ref on the // resource. Returns a failed status if the resource does not exist. -tf::Status GetModelResourceFromResourceHandle( +absl::Status GetModelResourceFromResourceHandle( OpKernelContext* ctx, YggdrasilModelResource** model_resource) { const Tensor* handle_tensor; - TF_RETURN_IF_ERROR(ctx->input(kInputModelHandle, &handle_tensor)); + RETURN_IF_ERROR(ctx->input(kInputModelHandle, &handle_tensor)); const tf::ResourceHandle& handle = handle_tensor->scalar()(); return LookupResource(ctx, handle, model_resource); } // Get the model path at execution time. -tf::Status GetModelPath(OpKernelContext* ctx, std::string* model_path) { +absl::Status GetModelPath(OpKernelContext* ctx, std::string* model_path) { const Tensor* model_path_tensor; - TF_RETURN_IF_ERROR(ctx->input(kInputPath, &model_path_tensor)); + RETURN_IF_ERROR(ctx->input(kInputPath, &model_path_tensor)); const auto model_paths = model_path_tensor->flat(); if (model_paths.size() != 1) { @@ -1152,7 +1125,7 @@ tf::Status GetModelPath(OpKernelContext* ctx, std::string* model_path) { kInputPath)); } *model_path = model_paths(0); - return tf::OkStatus(); + return absl::OkStatus(); } // Load the model from disk into a resource specified as resource name. @@ -1288,8 +1261,7 @@ class SimpleMLInferenceOp : public OpKernel { auto engine_cache_or_status = GetEngineCache(model_resource); if (!engine_cache_or_status.ok()) { - OP_REQUIRES_OK(ctx, - utils::FromUtilStatus(engine_cache_or_status.status())); + OP_REQUIRES_OK(ctx, engine_cache_or_status.status()); } if (output_type_ == OutputType::kLeaves) { @@ -1312,7 +1284,7 @@ class SimpleMLInferenceOp : public OpKernel { const auto& reps = model_resource->dense_col_representation(); if (reps.size() != dense_output_dim_) { OP_REQUIRES_OK( - ctx, TFStatusInvalidArgument(absl::StrCat( + ctx, absl::InvalidArgumentError(absl::StrCat( "The \"dense_output_dim\"=", dense_output_dim_, " attribute does not match the model output dimension=", reps.size()))); @@ -1327,7 +1299,7 @@ class SimpleMLInferenceOp : public OpKernel { engine_cache_or_status.value().get())); } else { OP_REQUIRES_OK(ctx, - TFStatusInvalidArgument("Not implemented output type")); + absl::InvalidArgumentError("Not implemented output type")); } ReturnEngineCache(std::move(engine_cache_or_status).value()); @@ -1374,7 +1346,7 @@ class SimpleMLInferenceOp : public OpKernel { const auto lookup_status = ctx->resource_manager()->Lookup( kModelContainer, model_identifier_, &res); if (!lookup_status.ok()) { - return tf::Status( + return absl::Status( lookup_status.code(), absl::StrCat(lookup_status.message(), ". This error caused the simpleML model not to be " @@ -1389,8 +1361,8 @@ class SimpleMLInferenceOp : public OpKernel { // // All the input feature are expected to have a first dimension of zero // (unused) or equal to the batch size. - tf::Status ComputeBatchSize(const InputTensors& input_tensors, - int* batch_size) { + absl::Status ComputeBatchSize(const InputTensors& input_tensors, + int* batch_size) { int max_size = 0; for (const int size : {input_tensors.numerical_features.dimension(0), @@ -1403,14 +1375,14 @@ class SimpleMLInferenceOp : public OpKernel { if (max_size == 0) { max_size = size; } else if (max_size != size) { - return TFStatusInvalidArgument(absl::StrCat( + return absl::InvalidArgumentError(absl::StrCat( "The batch size of the input features are inconsistent: ", max_size, " vs ", size, ".")); } } } *batch_size = max_size; - return tf::OkStatus(); + return absl::OkStatus(); } // Gets the c++ references on all the input tensor values of the inference op. @@ -1445,25 +1417,25 @@ class SimpleMLInferenceOp : public OpKernel { categorical_set_int_features_row_splits_dim_2_tensor}; // Set the batch size from the tensors. - TF_RETURN_IF_ERROR(ComputeBatchSize(tensors, &tensors.batch_size)); + RETURN_IF_ERROR(ComputeBatchSize(tensors, &tensors.batch_size)); // Check number of dimensions of inputs. // Note: The user cannot impact those if using the wrapper. if (tensors.numerical_features.dimension(1) != feature_index.numerical_features().size()) { - return TFStatusInvalidArgument( + return absl::InvalidArgumentError( "Unexpected dimension of numerical_features bank."); } if (tensors.boolean_features.dimension(1) != feature_index.boolean_features().size()) { - return TFStatusInvalidArgument( + return absl::InvalidArgumentError( "Unexpected dimension of boolean_features bank."); } if (tensors.categorical_int_features.dimension(1) != feature_index.categorical_int_features().size()) { - return TFStatusInvalidArgument( + return absl::InvalidArgumentError( "Unexpected dimension of categorical_int_features bank."); } @@ -1477,13 +1449,13 @@ class SimpleMLInferenceOp : public OpKernel { Tensor* dense_predictions_tensor = nullptr; Tensor* dense_col_representation_tensor = nullptr; - TF_RETURN_IF_ERROR(ctx->allocate_output( + RETURN_IF_ERROR(ctx->allocate_output( kOutputDensePredictions, TensorShape({batch_size, dense_output_dim_}), &dense_predictions_tensor)); - TF_RETURN_IF_ERROR(ctx->allocate_output(kOutputDenseColRepresentation, - TensorShape({dense_output_dim_}), - &dense_col_representation_tensor)); + RETURN_IF_ERROR(ctx->allocate_output(kOutputDenseColRepresentation, + TensorShape({dense_output_dim_}), + &dense_col_representation_tensor)); return OutputTensors{dense_predictions_tensor, dense_col_representation_tensor, dense_output_dim_}; @@ -1493,7 +1465,7 @@ class SimpleMLInferenceOp : public OpKernel { absl::StatusOr LinkOutputLeavesTensors( OpKernelContext* ctx, const int batch_size, const int num_trees) { Tensor* leaves_tensor = nullptr; - TF_RETURN_IF_ERROR(ctx->allocate_output( + RETURN_IF_ERROR(ctx->allocate_output( kOutputLeaves, TensorShape({batch_size, num_trees}), &leaves_tensor)); return OutputLeavesTensors{leaves_tensor, num_trees}; } @@ -1578,7 +1550,7 @@ class SimpleMLInferenceOpWithHandle : public SimpleMLInferenceOp { absl::StatusOr ImportModelResource( OpKernelContext* ctx) override { YggdrasilModelResource* res; - TF_RETURN_IF_ERROR(GetModelResourceFromResourceHandle(ctx, &res)); + RETURN_IF_ERROR(GetModelResourceFromResourceHandle(ctx, &res)); return res; } }; @@ -1598,7 +1570,7 @@ class SimpleMLInferenceLeafIndexOpWithHandle : public SimpleMLInferenceOp { absl::StatusOr ImportModelResource( OpKernelContext* ctx) override { YggdrasilModelResource* res; - TF_RETURN_IF_ERROR(GetModelResourceFromResourceHandle(ctx, &res)); + RETURN_IF_ERROR(GetModelResourceFromResourceHandle(ctx, &res)); return res; } }; @@ -1651,7 +1623,7 @@ class SimpleMLCreateModelResource : public OpKernel { container->MemoryUsed() + model_handle_.AllocatedBytes()); } *ret = container; - return tf::OkStatus(); + return absl::OkStatus(); }; // Creates the model resource. diff --git a/tensorflow_decision_forests/tensorflow/ops/inference/op.cc b/tensorflow_decision_forests/tensorflow/ops/inference/op.cc index 0745fe6c..e34f938e 100644 --- a/tensorflow_decision_forests/tensorflow/ops/inference/op.cc +++ b/tensorflow_decision_forests/tensorflow/ops/inference/op.cc @@ -29,7 +29,9 @@ #include "tensorflow/core/framework/op.h" +#include "absl/status/status.h" #include "tensorflow/core/framework/common_shape_fns.h" +#include "yggdrasil_decision_forests/utils/status_macros.h" namespace tensorflow { @@ -54,7 +56,7 @@ path: Path to the Yggdrasil model. Note: a Yggdrasil model directory should Returns a type-less OP that loads the model when called. )") .SetShapeFn([](::tensorflow::shape_inference::InferenceContext* c) { - return OkStatus(); + return absl::OkStatus(); }); REGISTER_OP("SimpleMLLoadModelFromPathWithHandle") @@ -86,19 +88,19 @@ allow_slow_inference: The model inference engine is selected automatically run with a fast inference engine. )") .SetShapeFn([](::tensorflow::shape_inference::InferenceContext* c) { - return OkStatus(); + return absl::OkStatus(); }); -Status SimpleMLInferenceOpSetShapeGeneric(shape_inference::InferenceContext* c, +absl::Status SimpleMLInferenceOpSetShapeGeneric(shape_inference::InferenceContext* c, const bool output_leaves) { // Check the rank of the input features. ::tensorflow::shape_inference::ShapeHandle tmp_shape; - TF_RETURN_IF_ERROR(c->WithRank(c->input(0), 2, &tmp_shape)); - TF_RETURN_IF_ERROR(c->WithRank(c->input(1), 2, &tmp_shape)); - TF_RETURN_IF_ERROR(c->WithRank(c->input(2), 2, &tmp_shape)); - TF_RETURN_IF_ERROR(c->WithRank(c->input(3), 1, &tmp_shape)); - TF_RETURN_IF_ERROR(c->WithRank(c->input(4), 1, &tmp_shape)); - TF_RETURN_IF_ERROR(c->WithRank(c->input(5), 1, &tmp_shape)); + RETURN_IF_ERROR(c->WithRank(c->input(0), 2, &tmp_shape)); + RETURN_IF_ERROR(c->WithRank(c->input(1), 2, &tmp_shape)); + RETURN_IF_ERROR(c->WithRank(c->input(2), 2, &tmp_shape)); + RETURN_IF_ERROR(c->WithRank(c->input(3), 1, &tmp_shape)); + RETURN_IF_ERROR(c->WithRank(c->input(4), 1, &tmp_shape)); + RETURN_IF_ERROR(c->WithRank(c->input(5), 1, &tmp_shape)); // Get the output batch dimension from the batch dimension of the input // features. Input batch dimension can be set or unknown. The special @@ -125,8 +127,7 @@ Status SimpleMLInferenceOpSetShapeGeneric(shape_inference::InferenceContext* c, if (known_batch_size == -1) { known_batch_size = value; } else if (known_batch_size != value) { - return Status( - static_cast(absl::StatusCode::kInvalidArgument), + return absl::InvalidArgumentError( "The batch size of the input features are inconsistent"); } } @@ -137,22 +138,22 @@ Status SimpleMLInferenceOpSetShapeGeneric(shape_inference::InferenceContext* c, } } if (output_leaves) { - TF_RETURN_IF_ERROR( + RETURN_IF_ERROR( c->set_output("leaves", {c->Matrix(batch_size, c->UnknownDim())})); } else { int dense_output_dim; - TF_RETURN_IF_ERROR(c->GetAttr("dense_output_dim", &dense_output_dim)); + RETURN_IF_ERROR(c->GetAttr("dense_output_dim", &dense_output_dim)); // Check the tensor shapes. - TF_RETURN_IF_ERROR(c->set_output( - "dense_predictions", {c->Matrix(batch_size, dense_output_dim)})); - TF_RETURN_IF_ERROR(c->set_output("dense_col_representation", - {c->Vector(dense_output_dim)})); + RETURN_IF_ERROR(c->set_output("dense_predictions", + {c->Matrix(batch_size, dense_output_dim)})); + RETURN_IF_ERROR(c->set_output("dense_col_representation", + {c->Vector(dense_output_dim)})); } - return OkStatus(); + return absl::OkStatus(); } -Status SimpleMLInferenceOpSetShape(shape_inference::InferenceContext* c) { +absl::Status SimpleMLInferenceOpSetShape(shape_inference::InferenceContext* c) { return SimpleMLInferenceOpSetShapeGeneric(c, /*output_leaves=*/false); } @@ -228,7 +229,7 @@ REGISTER_OP("SimpleMLInferenceOpWithHandle") .Output("dense_col_representation: string") .SetShapeFn(SimpleMLInferenceOpSetShape); -Status SimpleMLInferenceOpSetShapeLeafIndex( +absl::Status SimpleMLInferenceOpSetShapeLeafIndex( shape_inference::InferenceContext* c) { return SimpleMLInferenceOpSetShapeGeneric(c, /*output_leaves=*/true); } @@ -242,9 +243,9 @@ REGISTER_OP("SimpleMLInferenceLeafIndexOpWithHandle") .Output("leaves: int32") .SetShapeFn(SimpleMLInferenceOpSetShapeLeafIndex); -Status ScalarOutput(shape_inference::InferenceContext* c) { +absl::Status ScalarOutput(shape_inference::InferenceContext* c) { c->set_output(0, c->Scalar()); - return OkStatus(); + return absl::OkStatus(); } REGISTER_OP("SimpleMLCreateModelResource") diff --git a/tensorflow_decision_forests/tensorflow/ops/training/BUILD b/tensorflow_decision_forests/tensorflow/ops/training/BUILD index 361105e3..f9137424 100644 --- a/tensorflow_decision_forests/tensorflow/ops/training/BUILD +++ b/tensorflow_decision_forests/tensorflow/ops/training/BUILD @@ -94,12 +94,17 @@ cc_library( "//conditions:default": [], }), deps = [ + "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/log", + "@com_google_absl//absl/log:check", + "@com_google_absl//absl/memory", "@com_google_absl//absl/random", "@com_google_absl//absl/status", "@com_google_absl//absl/status:statusor", "@com_google_absl//absl/strings", "@com_google_absl//absl/types:optional", + "@com_google_absl//absl/types:span", "@ydf//yggdrasil_decision_forests/dataset:data_spec", "@ydf//yggdrasil_decision_forests/dataset:data_spec_cc_proto", "@ydf//yggdrasil_decision_forests/dataset:data_spec_inference", @@ -118,8 +123,8 @@ cc_library( "@ydf//yggdrasil_decision_forests/utils:concurrency", "@ydf//yggdrasil_decision_forests/utils:distribution_cc_proto", "@ydf//yggdrasil_decision_forests/utils:logging", + "@ydf//yggdrasil_decision_forests/utils:status_macros", "@ydf//yggdrasil_decision_forests/utils:synchronization_primitives", - "@ydf//yggdrasil_decision_forests/utils:tensorflow", "@ydf//yggdrasil_decision_forests/utils/distribute/implementations/grpc:grpc_manager", "@ydf//yggdrasil_decision_forests/utils/distribute/implementations/grpc:grpc_worker", ] + select({ diff --git a/tensorflow_decision_forests/tensorflow/ops/training/feature_on_file.cc b/tensorflow_decision_forests/tensorflow/ops/training/feature_on_file.cc index 65a497ff..28517c2e 100644 --- a/tensorflow_decision_forests/tensorflow/ops/training/feature_on_file.cc +++ b/tensorflow_decision_forests/tensorflow/ops/training/feature_on_file.cc @@ -15,16 +15,28 @@ #include "tensorflow_decision_forests/tensorflow/ops/training/feature_on_file.h" +#include +#include +#include +#include +#include + #include "absl/log/log.h" +#include "absl/memory/memory.h" +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "absl/types/span.h" +#include "tensorflow_decision_forests/tensorflow/ops/training/features.h" +#include "yggdrasil_decision_forests/learner/distributed_decision_tree/dataset_cache/column_cache.h" #include "yggdrasil_decision_forests/learner/distributed_decision_tree/dataset_cache/dataset_cache_common.h" #include "yggdrasil_decision_forests/utils/logging.h" +#include "yggdrasil_decision_forests/utils/status_macros.h" namespace tensorflow_decision_forests { namespace ops { namespace { namespace tf = ::tensorflow; -namespace utils = ::yggdrasil_decision_forests::utils; namespace dist_dt = ::yggdrasil_decision_forests::model::distributed_decision_tree; @@ -60,19 +72,18 @@ absl::Status AbstractFeatureResourceOnFile::End() { PartialColumnShardMetadata meta_data; RETURN_IF_ERROR(EndImp(&meta_data)); - RETURN_IF_ERROR(utils::ToUtilStatus(WriteBinaryProto( + RETURN_IF_ERROR(WriteBinaryProto( tensorflow::Env::Default(), absl::StrCat( PartialRawColumnFilePath(dataset_path_, feature_idx_, worker_idx_), kFilenameMetaDataPostfix), - meta_data))); + meta_data)); return absl::OkStatus(); } absl::Status NumericalResourceOnFile::Begin() { - RETURN_IF_ERROR( - utils::ToUtilStatus(tensorflow::Env::Default()->RecursivelyCreateDir( - PartialRawColumnFileDirectory(dataset_path_, feature_idx_)))); + RETURN_IF_ERROR(tensorflow::Env::Default()->RecursivelyCreateDir( + PartialRawColumnFileDirectory(dataset_path_, feature_idx_))); writer_ = absl::make_unique(); return writer_->Open( @@ -122,9 +133,8 @@ absl::Status NumericalResourceOnFile::EndImp( } absl::Status CategoricalResourceOnFile::Begin() { - RETURN_IF_ERROR( - utils::ToUtilStatus(tensorflow::Env::Default()->RecursivelyCreateDir( - PartialRawColumnFileDirectory(dataset_path_, feature_idx_)))); + RETURN_IF_ERROR(tensorflow::Env::Default()->RecursivelyCreateDir( + PartialRawColumnFileDirectory(dataset_path_, feature_idx_))); writer_ = absl::make_unique(); return writer_->Open( @@ -163,9 +173,8 @@ absl::Status CategoricalResourceOnFile::EndImp( } absl::Status CategoricalStringResourceOnFile::Begin() { - RETURN_IF_ERROR( - utils::ToUtilStatus(tensorflow::Env::Default()->RecursivelyCreateDir( - PartialRawColumnFileDirectory(dataset_path_, feature_idx_)))); + RETURN_IF_ERROR(tensorflow::Env::Default()->RecursivelyCreateDir( + PartialRawColumnFileDirectory(dataset_path_, feature_idx_))); writer_ = absl::make_unique(); return writer_->Open( @@ -247,7 +256,7 @@ void SimpleMLWorkerFinalizeFeatureOnFile::Compute( "finalization."))); return; } - OP_REQUIRES_OK(ctx, utils::FromUtilStatus(abstract_resource->End())); + OP_REQUIRES_OK(ctx, abstract_resource->End()); abstract_resource->Unref(); } } diff --git a/tensorflow_decision_forests/tensorflow/ops/training/feature_on_file.h b/tensorflow_decision_forests/tensorflow/ops/training/feature_on_file.h index b281c9ec..de9375be 100644 --- a/tensorflow_decision_forests/tensorflow/ops/training/feature_on_file.h +++ b/tensorflow_decision_forests/tensorflow/ops/training/feature_on_file.h @@ -31,6 +31,15 @@ #ifndef TENSORFLOW_DECISION_FORESTS_TENSORFLOW_OPS_TRAINING_FEATURE_ON_FILE_H_ #define TENSORFLOW_DECISION_FORESTS_TENSORFLOW_OPS_TRAINING_FEATURE_ON_FILE_H_ +#include +#include +#include +#include +#include + +#include "absl/container/flat_hash_map.h" +#include "absl/log/log.h" +#include "absl/status/status.h" #include "tensorflow/core/framework/device.h" #include "tensorflow/core/framework/op_kernel.h" #include "tensorflow/core/framework/resource_mgr.h" @@ -39,7 +48,7 @@ #include "yggdrasil_decision_forests/learner/distributed_decision_tree/dataset_cache/column_cache.h" #include "yggdrasil_decision_forests/learner/distributed_decision_tree/dataset_cache/dataset_cache.pb.h" #include "yggdrasil_decision_forests/model/abstract_model.h" -#include "yggdrasil_decision_forests/utils/tensorflow.h" +#include "yggdrasil_decision_forests/utils/synchronization_primitives.h" namespace tensorflow_decision_forests { namespace ops { @@ -49,7 +58,7 @@ namespace ops { constexpr char kFilenameDone[] = "partial_done"; // Create a done file in "dataset_path". -::tensorflow::Status CreateDoneFile(const std::string& dataset_path); +absl::Status CreateDoneFile(const std::string& dataset_path); // Checks is a done file exist in "dataset_path". bool HasDoneFile(const std::string& dataset_path); @@ -212,9 +221,7 @@ class FeatureOnFileOp : public tensorflow::OpKernel { auto* device = dynamic_cast(ctx->device()); if (device == nullptr) { OP_REQUIRES_OK(ctx, - tensorflow::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "Cannot find the worker idx")); + absl::InvalidArgumentError("Cannot find the worker idx")); } worker_idx_ = device->parsed_name().task; @@ -234,17 +241,14 @@ class FeatureOnFileOp : public tensorflow::OpKernel { const std::string& feature_name() const { return feature_name_; } void Compute(tensorflow::OpKernelContext* ctx) override { - using ::yggdrasil_decision_forests::utils::FromUtilStatus; - if (dataset_already_on_disk_) { return; } tensorflow::mutex_lock l(mu_); - OP_REQUIRES(ctx, ctx->input(0).dims() == 1, - tensorflow::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "The input 0 feature should have rank 1")); + OP_REQUIRES( + ctx, ctx->input(0).dims() == 1, + absl::InvalidArgumentError("The input 0 feature should have rank 1")); if (!resource_) { AbstractFeatureResourceOnFile* abstract_resource; OP_REQUIRES_OK( @@ -252,18 +256,16 @@ class FeatureOnFileOp : public tensorflow::OpKernel { ->LookupOrCreate( kModelContainer, resource_id_, &abstract_resource, [&](AbstractFeatureResourceOnFile** resource) - -> tensorflow::Status { + -> absl::Status { *resource = new Resource(feature_idx_, feature_name_, dataset_path_, worker_idx_); - return FromUtilStatus((*resource)->Begin()); + return (*resource)->Begin(); })); resource_ = static_cast(abstract_resource); } OP_REQUIRES(ctx, ctx->input(0).dims() == 1, - tensorflow::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "The input should have rank 1")); - OP_REQUIRES_OK(ctx, FromUtilStatus(resource_->AddValue(ctx->input(0)))); + absl::InvalidArgumentError("The input should have rank 1")); + OP_REQUIRES_OK(ctx, resource_->AddValue(ctx->input(0))); } private: diff --git a/tensorflow_decision_forests/tensorflow/ops/training/features.h b/tensorflow_decision_forests/tensorflow/ops/training/features.h index 5fb1ccfa..12a1893c 100644 --- a/tensorflow_decision_forests/tensorflow/ops/training/features.h +++ b/tensorflow_decision_forests/tensorflow/ops/training/features.h @@ -18,11 +18,23 @@ #ifndef TENSORFLOW_DECISION_FORESTS_TENSORFLOW_OPS_TRAINING_FEATURES_H_ #define TENSORFLOW_DECISION_FORESTS_TENSORFLOW_OPS_TRAINING_FEATURES_H_ +#include +#include +#include +#include +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/container/flat_hash_map.h" #include "absl/log/log.h" +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" #include "tensorflow/core/framework/op_kernel.h" #include "tensorflow/core/framework/resource_mgr.h" -#include "yggdrasil_decision_forests/model/abstract_model.h" -#include "yggdrasil_decision_forests/utils/tensorflow.h" +#include "yggdrasil_decision_forests/dataset/data_spec.h" +#include "yggdrasil_decision_forests/dataset/vertical_dataset.h" +#include "yggdrasil_decision_forests/utils/synchronization_primitives.h" namespace tensorflow_decision_forests { namespace ops { @@ -187,28 +199,24 @@ class Feature : public tensorflow::OpKernel { ctx, ctx->resource_manager() ->LookupOrCreate( kModelContainer, identifier_, &tmp_abstract_resource, - [&](AbstractFeatureResource** resource) - -> tensorflow::Status { + [&](AbstractFeatureResource** resource) -> absl::Status { *resource = new Resource(feature_name_); - return tensorflow::Status(); + return absl::Status(); })); resource_ = static_cast(tmp_abstract_resource); } if constexpr (kNumInputs == 1) { - OP_REQUIRES(ctx, ctx->input(0).dims() == 1, - tensorflow::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "The input 0 feature should have rank 1")); + OP_REQUIRES( + ctx, ctx->input(0).dims() == 1, + absl::InvalidArgumentError("The input 0 feature should have rank 1")); resource_->Add(ctx->input(0)); } else if constexpr (kNumInputs == 2) { - OP_REQUIRES(ctx, ctx->input(0).dims() == 1, - tensorflow::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "The input 0 feature should have rank 1")); - OP_REQUIRES(ctx, ctx->input(1).dims() == 1, - tensorflow::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "The input 1 feature should have rank 1")); + OP_REQUIRES( + ctx, ctx->input(0).dims() == 1, + absl::InvalidArgumentError("The input 0 feature should have rank 1")); + OP_REQUIRES( + ctx, ctx->input(1).dims() == 1, + absl::InvalidArgumentError("The input 1 feature should have rank 1")); resource_->Add(ctx->input(0), ctx->input(1)); } else { LOG(FATAL) << "Invalid dimensions"; @@ -364,20 +372,20 @@ class FeatureSet { // existing_dataspec: Optional existing dataspec to use as guide for the // dataspec feature idx. // dataset_type: The function of the dataset. - tensorflow::Status Link( + absl::Status Link( tensorflow::OpKernelContext* ctx, const std::vector& column_ids, - const ::yggdrasil_decision_forests::dataset::proto:: - DataSpecification* const existing_dataspec, - const DatasetType dataset_type = DatasetType::kTraining); + const ::yggdrasil_decision_forests::dataset::proto::DataSpecification* + existing_dataspec, + DatasetType dataset_type = DatasetType::kTraining); - tensorflow::Status Unlink(); + absl::Status Unlink(); template - using FeatureIterator = std::function; + using FeatureIterator = + std::function; - tensorflow::Status IterateFeatures( + absl::Status IterateFeatures( FeatureIterator lambda_numerical, FeatureIterator lambda_categorical_string, @@ -390,7 +398,7 @@ class FeatureSet { // Initialize a dataset (including the dataset's dataspec) from the linked // resource aggregators. - tensorflow::Status InitializeDatasetFromFeatures( + absl::Status InitializeDatasetFromFeatures( tensorflow::OpKernelContext* ctx, const ::yggdrasil_decision_forests::dataset::proto:: DataSpecificationGuide& guide, @@ -398,7 +406,7 @@ class FeatureSet { // Moves the feature values contained in the aggregators into the dataset. // Following this call, the feature aggregators are empty. - tensorflow::Status MoveExamplesFromFeaturesToDataset( + absl::Status MoveExamplesFromFeaturesToDataset( tensorflow::OpKernelContext* ctx, ::yggdrasil_decision_forests::dataset::VerticalDataset* dataset); diff --git a/tensorflow_decision_forests/tensorflow/ops/training/kernel.cc b/tensorflow_decision_forests/tensorflow/ops/training/kernel.cc index 80b17143..041cc8b1 100644 --- a/tensorflow_decision_forests/tensorflow/ops/training/kernel.cc +++ b/tensorflow_decision_forests/tensorflow/ops/training/kernel.cc @@ -18,15 +18,21 @@ #include "tensorflow_decision_forests/tensorflow/ops/training/kernel.h" #include +#ifdef TFDF_STOP_TRAINING_ON_INTERRUPT #include +#endif #include #include #include #include +#include #include "absl/log/log.h" +#include "absl/memory/memory.h" +#include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" #include "absl/strings/substitute.h" #include "absl/types/optional.h" #include "tensorflow/core/framework/op_kernel.h" @@ -43,7 +49,7 @@ #include "yggdrasil_decision_forests/model/decision_tree/decision_forest_interface.h" #include "yggdrasil_decision_forests/model/model_library.h" #include "yggdrasil_decision_forests/utils/logging.h" -#include "yggdrasil_decision_forests/utils/tensorflow.h" +#include "yggdrasil_decision_forests/utils/status_macros.h" namespace tensorflow_decision_forests { namespace ops { @@ -51,14 +57,13 @@ namespace { namespace tf = ::tensorflow; namespace ydf = ::yggdrasil_decision_forests; namespace model = ydf::model; -namespace utils = ydf::utils; namespace dataset = ydf::dataset; } // namespace absl::Status YggdrasilModelContainer::LoadModel( const absl::string_view model_path) { - TF_RETURN_IF_ERROR_FROM_ABSL_STATUS(model::LoadModel(model_path, &model_)); + RETURN_IF_ERROR(model::LoadModel(model_path, &model_)); // Cache label information. const auto& label_spec = model_->data_spec().columns(model_->label_col_idx()); num_label_classes_ = label_spec.categorical().number_of_unique_values(); @@ -135,10 +140,10 @@ REGISTER_KERNEL_BUILDER(Name("SimpleMLHashFeature").Device(tf::DEVICE_CPU), FeatureSet::~FeatureSet() { Unlink().IgnoreError(); } absl::Status FeatureSet::Link( - tf::OpKernelContext* ctx, const std::vector& resource_ids, + tf::OpKernelContext* ctx, const std::vector& column_ids, const dataset::proto::DataSpecification* const existing_dataspec, const DatasetType dataset_type) { - std::vector sorted_resource_ids = resource_ids; + std::vector sorted_resource_ids = column_ids; std::sort(sorted_resource_ids.begin(), sorted_resource_ids.end()); for (const auto& column_id : sorted_resource_ids) { @@ -153,9 +158,12 @@ absl::Status FeatureSet::Link( } AbstractFeatureResource* feature; - TF_RETURN_IF_ERROR( + const auto lookup_status = ctx->resource_manager()->Lookup( - kModelContainer, resource_id, &feature)); + kModelContainer, resource_id, &feature); + if (!lookup_status.ok()) { + return lookup_status; + } const int feature_idx = existing_dataspec ? dataset::GetColumnIdxFromName( @@ -212,37 +220,35 @@ absl::Status FeatureSet::IterateFeatures( FeatureIterator lambda_hash) { for (auto& feature : numerical_features_) { tf::mutex_lock l(*feature.second->mutable_mutex()); - TF_RETURN_IF_ERROR(lambda_numerical(feature.second, feature.first)); + RETURN_IF_ERROR(lambda_numerical(feature.second, feature.first)); } for (auto& feature : categorical_string_features_) { tf::mutex_lock l(*feature.second->mutable_mutex()); - TF_RETURN_IF_ERROR( - lambda_categorical_string(feature.second, feature.first)); + RETURN_IF_ERROR(lambda_categorical_string(feature.second, feature.first)); } for (auto& feature : categorical_int_features_) { tf::mutex_lock l(*feature.second->mutable_mutex()); - TF_RETURN_IF_ERROR(lambda_categorical_int(feature.second, feature.first)); + RETURN_IF_ERROR(lambda_categorical_int(feature.second, feature.first)); } for (auto& feature : categorical_set_string_features_) { tf::mutex_lock l(*feature.second->mutable_mutex()); - TF_RETURN_IF_ERROR( + RETURN_IF_ERROR( lambda_categorical_set_string(feature.second, feature.first)); } for (auto& feature : categorical_set_int_features_) { tf::mutex_lock l(*feature.second->mutable_mutex()); - TF_RETURN_IF_ERROR( - lambda_categorical_set_int(feature.second, feature.first)); + RETURN_IF_ERROR(lambda_categorical_set_int(feature.second, feature.first)); } for (auto& feature : hash_features_) { tf::mutex_lock l(*feature.second->mutable_mutex()); - TF_RETURN_IF_ERROR(lambda_hash(feature.second, feature.first)); + RETURN_IF_ERROR(lambda_hash(feature.second, feature.first)); } return absl::OkStatus(); } absl::Status FeatureSet::Unlink() { - TF_RETURN_IF_ERROR(IterateFeatures( + RETURN_IF_ERROR(IterateFeatures( [](SimpleMLNumericalFeature::Resource* feature, const int feature_idx) { feature->Unref(); return absl::OkStatus(); @@ -316,7 +322,7 @@ absl::Status FeatureSet::InitializeDatasetFromFeatures( dataset::proto::Column* col, const bool apply_type = false) -> absl::Status { dataset::proto::ColumnGuide col_guide; - TF_RETURN_IF_ERROR_FROM_ABSL_STATUS( + RETURN_IF_ERROR( dataset::BuildColumnGuide(feature_name, guide, &col_guide).status()); if (apply_type) { if (col_guide.has_type()) { @@ -328,35 +334,35 @@ absl::Status FeatureSet::InitializeDatasetFromFeatures( } } } - return utils::FromUtilStatus( - dataset::UpdateSingleColSpecWithGuideInfo(col_guide, col)); + return dataset::UpdateSingleColSpecWithGuideInfo(col_guide, col); }; - TF_RETURN_IF_ERROR(IterateFeatures( - [&](SimpleMLNumericalFeature::Resource* feature, const int feature_idx) { + RETURN_IF_ERROR(IterateFeatures( + [&](SimpleMLNumericalFeature::Resource* feature, + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); col->set_name(feature->feature_name()); col->set_type(dataset::proto::ColumnType::NUMERICAL); - TF_RETURN_IF_ERROR( + RETURN_IF_ERROR( apply_guide(feature->feature_name(), col, /*apply_type=*/true)); return set_num_examples(feature->data().size(), feature->NumBatches()); }, [&](SimpleMLCategoricalStringFeature::Resource* feature, - const int feature_idx) { + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); col->set_name(feature->feature_name()); col->set_type(dataset::proto::ColumnType::CATEGORICAL); - TF_RETURN_IF_ERROR(apply_guide(feature->feature_name(), col)); - TF_RETURN_IF_ERROR(set_num_examples(feature->indexed_data().size(), - feature->NumBatches())); + RETURN_IF_ERROR(apply_guide(feature->feature_name(), col)); + RETURN_IF_ERROR(set_num_examples(feature->indexed_data().size(), + feature->NumBatches())); return absl::OkStatus(); }, [&](SimpleMLCategoricalIntFeature::Resource* feature, - const int feature_idx) { + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); col->set_name(feature->feature_name()); col->set_type(dataset::proto::ColumnType::CATEGORICAL); - TF_RETURN_IF_ERROR(apply_guide(feature->feature_name(), col)); + RETURN_IF_ERROR(apply_guide(feature->feature_name(), col)); // Both in TF-DF and SimpleML Estimator, integer values are offset by 1. // See CATEGORICAL_INTEGER_OFFSET. col->mutable_categorical()->set_offset_value_by_one_during_training( @@ -365,29 +371,30 @@ absl::Status FeatureSet::InitializeDatasetFromFeatures( return set_num_examples(feature->data().size(), feature->NumBatches()); }, [&](SimpleMLCategoricalSetStringFeature::Resource* feature, - const int feature_idx) { + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); col->set_name(feature->feature_name()); col->set_type(dataset::proto::ColumnType::CATEGORICAL_SET); - TF_RETURN_IF_ERROR(apply_guide(feature->feature_name(), col)); + RETURN_IF_ERROR(apply_guide(feature->feature_name(), col)); return set_num_examples(feature->num_examples(), feature->num_batches()); }, [&](SimpleMLCategoricalSetIntFeature::Resource* feature, - const int feature_idx) { + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); col->set_name(feature->feature_name()); col->set_type(dataset::proto::ColumnType::CATEGORICAL_SET); - TF_RETURN_IF_ERROR(apply_guide(feature->feature_name(), col)); + RETURN_IF_ERROR(apply_guide(feature->feature_name(), col)); col->mutable_categorical()->set_is_already_integerized(true); return set_num_examples(feature->num_examples(), feature->num_batches()); }, - [&](SimpleMLHashFeature::Resource* feature, const int feature_idx) { + [&](SimpleMLHashFeature::Resource* feature, + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); col->set_name(feature->feature_name()); col->set_type(dataset::proto::ColumnType::HASH); - TF_RETURN_IF_ERROR(apply_guide(feature->feature_name(), col)); + RETURN_IF_ERROR(apply_guide(feature->feature_name(), col)); return set_num_examples(feature->data().size(), feature->NumBatches()); })); @@ -400,15 +407,16 @@ absl::Status FeatureSet::InitializeDatasetFromFeatures( "No training examples available."); } - TF_RETURN_IF_ERROR_FROM_ABSL_STATUS(dataset->CreateColumnsFromDataspec()); + RETURN_IF_ERROR(dataset->CreateColumnsFromDataspec()); dataset->mutable_data_spec()->set_created_num_rows(num_examples); dataset::proto::DataSpecificationAccumulator accumulator; dataset::InitializeDataspecAccumulator(dataset->data_spec(), &accumulator); - TF_RETURN_IF_ERROR(IterateFeatures( - [&](SimpleMLNumericalFeature::Resource* feature, const int feature_idx) { + RETURN_IF_ERROR(IterateFeatures( + [&](SimpleMLNumericalFeature::Resource* feature, + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); auto* col_acc = accumulator.mutable_columns(feature_idx); @@ -417,7 +425,7 @@ absl::Status FeatureSet::InitializeDatasetFromFeatures( col->type() == dataset::proto::ColumnType::DISCRETIZED_NUMERICAL; for (const auto value : feature->data()) { - TF_RETURN_IF_ERROR_FROM_ABSL_STATUS( + RETURN_IF_ERROR( dataset::UpdateNumericalColumnSpec(value, col, col_acc)); if (discretized) { dataset::UpdateComputeSpecDiscretizedNumerical(value, col, col_acc); @@ -426,43 +434,42 @@ absl::Status FeatureSet::InitializeDatasetFromFeatures( return absl::OkStatus(); }, [&](SimpleMLCategoricalStringFeature::Resource* feature, - const int feature_idx) { + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); auto* col_acc = accumulator.mutable_columns(feature_idx); const auto& reverse_index = feature->reverse_index(); for (const auto indexed_value : feature->indexed_data()) { - TF_RETURN_IF_ERROR_FROM_ABSL_STATUS( - dataset::UpdateCategoricalStringColumnSpec( - reverse_index[indexed_value], col, col_acc)); + RETURN_IF_ERROR(dataset::UpdateCategoricalStringColumnSpec( + reverse_index[indexed_value], col, col_acc)); } return absl::OkStatus(); }, [&](SimpleMLCategoricalIntFeature::Resource* feature, - const int feature_idx) { + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); auto* col_acc = accumulator.mutable_columns(feature_idx); for (const auto value : feature->data()) { - TF_RETURN_IF_ERROR_FROM_ABSL_STATUS( + RETURN_IF_ERROR( dataset::UpdateCategoricalIntColumnSpec(value, col, col_acc)); } return absl::OkStatus(); }, [&](SimpleMLCategoricalSetStringFeature::Resource* feature, - const int feature_idx) { + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); auto* col_acc = accumulator.mutable_columns(feature_idx); for (const auto& value : feature->values()) { - TF_RETURN_IF_ERROR_FROM_ABSL_STATUS( + RETURN_IF_ERROR( dataset::UpdateCategoricalStringColumnSpec(value, col, col_acc)); } return absl::OkStatus(); }, [&](SimpleMLCategoricalSetIntFeature::Resource* feature, - const int feature_idx) { + const int feature_idx) -> absl::Status { auto* col = dataset->mutable_data_spec()->mutable_columns(feature_idx); auto* col_acc = accumulator.mutable_columns(feature_idx); for (const auto value : feature->values()) { - TF_RETURN_IF_ERROR_FROM_ABSL_STATUS( + RETURN_IF_ERROR( dataset::UpdateCategoricalIntColumnSpec(value, col, col_acc)); } return absl::OkStatus(); @@ -472,8 +479,8 @@ absl::Status FeatureSet::InitializeDatasetFromFeatures( return absl::OkStatus(); })); - TF_RETURN_IF_ERROR_FROM_ABSL_STATUS(dataset::FinalizeComputeSpec( - guide, accumulator, dataset->mutable_data_spec())); + RETURN_IF_ERROR(dataset::FinalizeComputeSpec(guide, accumulator, + dataset->mutable_data_spec())); return absl::OkStatus(); } @@ -498,10 +505,10 @@ absl::Status FeatureSet::MoveExamplesFromFeaturesToDataset( return absl::OkStatus(); }; - TF_RETURN_IF_ERROR(IterateFeatures( + RETURN_IF_ERROR(IterateFeatures( [&](SimpleMLNumericalFeature::Resource* feature, const int feature_idx) -> absl::Status { - TF_RETURN_IF_ERROR(set_num_rows(feature->data().size(), feature)); + RETURN_IF_ERROR(set_num_rows(feature->data().size(), feature)); const auto& col = dataset->mutable_data_spec()->columns(feature_idx); // Is the numerical column discretized? @@ -509,7 +516,7 @@ absl::Status FeatureSet::MoveExamplesFromFeaturesToDataset( col.type() == dataset::proto::ColumnType::DISCRETIZED_NUMERICAL; if (discretized) { // Copy the discretized numerical values. - TF_ASSIGN_OR_RETURN_FROM_ABSL_STATUS( + ASSIGN_OR_RETURN( auto* col_data, dataset->MutableColumnWithCastWithStatus< dataset::VerticalDataset::DiscretizedNumericalColumn>( @@ -520,7 +527,7 @@ absl::Status FeatureSet::MoveExamplesFromFeaturesToDataset( } } else { // Copy the non discretized values. - TF_ASSIGN_OR_RETURN_FROM_ABSL_STATUS( + ASSIGN_OR_RETURN( auto* col_data, dataset->MutableColumnWithCastWithStatus< dataset::VerticalDataset::NumericalColumn>(feature_idx)); @@ -531,10 +538,9 @@ absl::Status FeatureSet::MoveExamplesFromFeaturesToDataset( }, [&](SimpleMLCategoricalStringFeature::Resource* feature, const int feature_idx) -> absl::Status { - TF_RETURN_IF_ERROR( - set_num_rows(feature->indexed_data().size(), feature)); + RETURN_IF_ERROR(set_num_rows(feature->indexed_data().size(), feature)); const auto& col_spec = dataset->data_spec().columns(feature_idx); - TF_ASSIGN_OR_RETURN_FROM_ABSL_STATUS( + ASSIGN_OR_RETURN( auto* col_data, dataset->MutableColumnWithCastWithStatus< dataset::VerticalDataset::CategoricalColumn>(feature_idx)); @@ -545,7 +551,7 @@ absl::Status FeatureSet::MoveExamplesFromFeaturesToDataset( if (value.empty()) { col_data->AddNA(); } else { - TF_ASSIGN_OR_RETURN_FROM_ABSL_STATUS( + ASSIGN_OR_RETURN( auto int_value, dataset::CategoricalStringToValueWithStatus(value, col_spec)); col_data->Add(int_value); @@ -557,9 +563,9 @@ absl::Status FeatureSet::MoveExamplesFromFeaturesToDataset( }, [&](SimpleMLCategoricalIntFeature::Resource* feature, const int feature_idx) -> absl::Status { - TF_RETURN_IF_ERROR(set_num_rows(feature->data().size(), feature)); + RETURN_IF_ERROR(set_num_rows(feature->data().size(), feature)); const auto& col_spec = dataset->data_spec().columns(feature_idx); - TF_ASSIGN_OR_RETURN_FROM_ABSL_STATUS( + ASSIGN_OR_RETURN( auto* col_data, dataset->MutableColumnWithCastWithStatus< dataset::VerticalDataset::CategoricalColumn>(feature_idx)); @@ -580,9 +586,9 @@ absl::Status FeatureSet::MoveExamplesFromFeaturesToDataset( }, [&](SimpleMLCategoricalSetStringFeature::Resource* feature, const int feature_idx) -> absl::Status { - TF_RETURN_IF_ERROR(set_num_rows(feature->num_examples(), feature)); + RETURN_IF_ERROR(set_num_rows(feature->num_examples(), feature)); const auto& col_spec = dataset->data_spec().columns(feature_idx); - TF_ASSIGN_OR_RETURN_FROM_ABSL_STATUS( + ASSIGN_OR_RETURN( auto* col_data, dataset->MutableColumnWithCastWithStatus< dataset::VerticalDataset::CategoricalSetColumn>(feature_idx)); @@ -600,10 +606,9 @@ absl::Status FeatureSet::MoveExamplesFromFeaturesToDataset( for (int value_idx = begin_value_idx; value_idx < end_value_idx; value_idx++) { const auto& value_str = feature->values()[value_idx]; - TF_ASSIGN_OR_RETURN_FROM_ABSL_STATUS( - const int32_t value, - dataset::CategoricalStringToValueWithStatus(value_str, - col_spec)); + ASSIGN_OR_RETURN(const int32_t value, + dataset::CategoricalStringToValueWithStatus( + value_str, col_spec)); tmp_value.push_back(value); } @@ -619,9 +624,9 @@ absl::Status FeatureSet::MoveExamplesFromFeaturesToDataset( }, [&](SimpleMLCategoricalSetIntFeature::Resource* feature, const int feature_idx) -> absl::Status { - TF_RETURN_IF_ERROR(set_num_rows(feature->num_examples(), feature)); + RETURN_IF_ERROR(set_num_rows(feature->num_examples(), feature)); const auto& col_spec = dataset->data_spec().columns(feature_idx); - TF_ASSIGN_OR_RETURN_FROM_ABSL_STATUS( + ASSIGN_OR_RETURN( auto* col_data, dataset->MutableColumnWithCastWithStatus< dataset::VerticalDataset::CategoricalSetColumn>(feature_idx)); @@ -671,8 +676,8 @@ absl::Status FeatureSet::MoveExamplesFromFeaturesToDataset( }, [&](SimpleMLHashFeature::Resource* feature, const int feature_idx) -> absl::Status { - TF_RETURN_IF_ERROR(set_num_rows(feature->data().size(), feature)); - TF_ASSIGN_OR_RETURN_FROM_ABSL_STATUS( + RETURN_IF_ERROR(set_num_rows(feature->data().size(), feature)); + ASSIGN_OR_RETURN( auto* col_data, dataset->MutableColumnWithCastWithStatus< dataset::VerticalDataset::HashColumn>(feature_idx)); @@ -814,10 +819,9 @@ class SimpleMLModelTrainer : public tensorflow::OpKernel { model::proto::TrainingConfig config = training_config_; std::unique_ptr learner; - OP_REQUIRES_OK(ctx, utils::FromUtilStatus(GetLearner(config, &learner))); + OP_REQUIRES_OK(ctx, GetLearner(config, &learner)); - OP_REQUIRES_OK( - ctx, utils::FromUtilStatus(learner->SetHyperParameters(hparams_))); + OP_REQUIRES_OK(ctx, learner->SetHyperParameters(hparams_)); *learner->mutable_deployment() = deployment_config_; if (!model_dir_.empty()) { @@ -865,7 +869,7 @@ class SimpleMLModelTrainer : public tensorflow::OpKernel { // Create a std::function to train the model. // - // Note: The capture of std::function should be copiable. + // Note: The capture of std::function should be copyable. struct TrainingState { std::string model_dir; bool use_file_prefix; @@ -899,8 +903,7 @@ class SimpleMLModelTrainer : public tensorflow::OpKernel { } #ifdef TFDF_STOP_TRAINING_ON_INTERRUPT - RETURN_IF_ERROR( - utils::ToUtilStatus(interruption::DisableUserInterruption())); + RETURN_IF_ERROR(interruption::DisableUserInterruption()); #endif RETURN_IF_ERROR(model.status()); @@ -948,7 +951,7 @@ class SimpleMLModelTrainer : public tensorflow::OpKernel { auto process_id_or = StartLongRunningProcess(ctx, std::move(async_train)); if (!process_id_or.ok()) { - OP_REQUIRES_OK(ctx, utils::FromUtilStatus(process_id_or.status())); + OP_REQUIRES_OK(ctx, process_id_or.status()); } output_tensor->scalar()() = process_id_or.value(); @@ -957,7 +960,7 @@ class SimpleMLModelTrainer : public tensorflow::OpKernel { auto status_or = GetLongRunningProcessStatus(ctx, process_id_or.value()); if (!status_or.ok()) { - OP_REQUIRES_OK(ctx, utils::FromUtilStatus(status_or.status())); + OP_REQUIRES_OK(ctx, status_or.status()); } if (status_or.value() == LongRunningProcessStatus::kSuccess) { break; @@ -972,11 +975,10 @@ class SimpleMLModelTrainer : public tensorflow::OpKernel { const std::vector& resource_ids, dataset::VerticalDataset* dataset) { FeatureSet feature_set; - TF_RETURN_IF_ERROR( - feature_set.Link(ctx, resource_ids, nullptr, dataset_type)); - TF_RETURN_IF_ERROR( + RETURN_IF_ERROR(feature_set.Link(ctx, resource_ids, nullptr, dataset_type)); + RETURN_IF_ERROR( feature_set.InitializeDatasetFromFeatures(ctx, guide_, dataset)); - TF_RETURN_IF_ERROR( + RETURN_IF_ERROR( feature_set.MoveExamplesFromFeaturesToDataset(ctx, dataset)); return absl::OkStatus(); } @@ -1060,10 +1062,8 @@ class SimpleMLCheckTrainingConfiguration : public tensorflow::OpKernel { // Check the parameters by creating a learner. std::unique_ptr learner; - OP_REQUIRES_OK( - ctx, utils::FromUtilStatus(GetLearner(training_config_, &learner))); - OP_REQUIRES_OK( - ctx, utils::FromUtilStatus(learner->SetHyperParameters(hparams_))); + OP_REQUIRES_OK(ctx, GetLearner(training_config_, &learner)); + OP_REQUIRES_OK(ctx, learner->SetHyperParameters(hparams_)); } model::proto::GenericHyperParameters hparams_; @@ -1087,7 +1087,7 @@ class AbstractSimpleMLModelOp : public tensorflow::OpKernel { // Called when the op is applied. If "model"==nullptr, the model is not // available. virtual void ComputeModel(tf::OpKernelContext* ctx, - const model::AbstractModel* const model) = 0; + const model::AbstractModel* model) = 0; void Compute(tf::OpKernelContext* ctx) override { YggdrasilModelContainer* model_container; @@ -1178,32 +1178,32 @@ REGISTER_KERNEL_BUILDER( #ifdef TFDF_STOP_TRAINING_ON_INTERRUPT namespace interruption { -tf::Status EnableUserInterruption() { +absl::Status EnableUserInterruption() { // Detect interrupt signals. const bool set_signal_handler = active_learners.fetch_add(1) == 0; if (set_signal_handler) { stop_training = false; previous_signal_handler = std::signal(SIGINT, StopTrainingSignalHandler); if (previous_signal_handler == SIG_ERR) { - TF_RETURN_IF_ERROR(tf::Status( + RETURN_IF_ERROR(absl::Status( static_cast(absl::StatusCode::kInvalidArgument), "Cannot change the std::signal handler.")); } } - return tf::OkStatus(); + return absl::OkStatus(); } -tf::Status DisableUserInterruption() { +absl::Status DisableUserInterruption() { const bool restore_signal_handler = active_learners.fetch_sub(1) == 1; if (restore_signal_handler) { // Restore the previous signal handler. if (std::signal(SIGINT, previous_signal_handler) == SIG_ERR) { - TF_RETURN_IF_ERROR(tf::Status( + RETURN_IF_ERROR(absl::Status( static_cast(absl::StatusCode::kInvalidArgument), "Cannot restore the std::signal handler.")); } } - return tf::OkStatus(); + return absl::OkStatus(); } } // namespace interruption diff --git a/tensorflow_decision_forests/tensorflow/ops/training/kernel.h b/tensorflow_decision_forests/tensorflow/ops/training/kernel.h index 620e67a9..d33bd1e9 100644 --- a/tensorflow_decision_forests/tensorflow/ops/training/kernel.h +++ b/tensorflow_decision_forests/tensorflow/ops/training/kernel.h @@ -16,10 +16,18 @@ #ifndef TENSORFLOW_DECISION_FORESTS_TENSORFLOW_OPS_TRAINING_TRAINING_H_ #define TENSORFLOW_DECISION_FORESTS_TENSORFLOW_OPS_TRAINING_TRAINING_H_ +#include +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/string_view.h" #include "tensorflow/core/framework/op_kernel.h" #include "tensorflow/core/framework/resource_mgr.h" #include "yggdrasil_decision_forests/model/abstract_model.h" -#include "yggdrasil_decision_forests/utils/tensorflow.h" namespace tensorflow_decision_forests { namespace ops { @@ -67,10 +75,10 @@ inline std::atomic active_learners{0}; inline void StopTrainingSignalHandler(int signal) { stop_training = true; } // Enables the interruption listener. -tensorflow::Status EnableUserInterruption(); +absl::Status EnableUserInterruption(); // Disable the interruption listener -tensorflow::Status DisableUserInterruption(); +absl::Status DisableUserInterruption(); } // namespace interruption #endif @@ -82,7 +90,7 @@ class YggdrasilModelContainer : public tensorflow::ResourceBase { std::string DebugString() const override { return "YggdrasilModelContainer"; } - tensorflow::Status LoadModel(const absl::string_view model_path); + absl::Status LoadModel(absl::string_view model_path); tensorflow::int64 MemoryUsed() const override { return approximate_model_size_in_memory_; @@ -97,7 +105,7 @@ class YggdrasilModelContainer : public tensorflow::ResourceBase { return *model_; } - const int num_label_classes() const { return num_label_classes_; } + int num_label_classes() const { return num_label_classes_; } const std::vector& output_class_representation() const { return output_class_representation_; diff --git a/tensorflow_decision_forests/tensorflow/ops/training/kernel_grpc_worker.cc b/tensorflow_decision_forests/tensorflow/ops/training/kernel_grpc_worker.cc index 47311015..3f96686b 100644 --- a/tensorflow_decision_forests/tensorflow/ops/training/kernel_grpc_worker.cc +++ b/tensorflow_decision_forests/tensorflow/ops/training/kernel_grpc_worker.cc @@ -14,15 +14,19 @@ */ #include +#include +#include #include "absl/log/log.h" -#include "absl/random/random.h" +#include "absl/memory/memory.h" #include "absl/status/status.h" +#include "absl/strings/str_cat.h" #include "tensorflow_decision_forests/tensorflow/ops/training/kernel.h" #include "yggdrasil_decision_forests/utils/concurrency.h" #include "yggdrasil_decision_forests/utils/distribute/implementations/grpc/grpc_manager.h" #include "yggdrasil_decision_forests/utils/distribute/implementations/grpc/grpc_worker.h" #include "yggdrasil_decision_forests/utils/logging.h" +#include "yggdrasil_decision_forests/utils/status_macros.h" #include "yggdrasil_decision_forests/utils/synchronization_primitives.h" namespace tensorflow_decision_forests { @@ -115,8 +119,7 @@ class SimpleMLCreateYDFGRPCWorker : public tensorflow::OpKernel { kTFContainer, absl::StrCat(key_), &server_resource, [&](YDFGRPCServerResource** resource) -> absl::Status { *resource = new YDFGRPCServerResource(); - return utils::FromUtilStatus( - (*resource)->StartServer(force_ydf_port_)); + return (*resource)->StartServer(force_ydf_port_); })); // Returns the server port. diff --git a/tensorflow_decision_forests/tensorflow/ops/training/kernel_long_process.cc b/tensorflow_decision_forests/tensorflow/ops/training/kernel_long_process.cc index 8edcfa41..2d472e89 100644 --- a/tensorflow_decision_forests/tensorflow/ops/training/kernel_long_process.cc +++ b/tensorflow_decision_forests/tensorflow/ops/training/kernel_long_process.cc @@ -17,9 +17,21 @@ // // See "op.cc" for the documentation of the ops. // +#include +#include +#include +#include +#include + +#include "absl/log/check.h" +#include "absl/memory/memory.h" #include "absl/random/random.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" #include "tensorflow_decision_forests/tensorflow/ops/training/kernel.h" #include "yggdrasil_decision_forests/utils/concurrency.h" +#include "yggdrasil_decision_forests/utils/status_macros.h" #include "yggdrasil_decision_forests/utils/synchronization_primitives.h" namespace tensorflow_decision_forests { @@ -97,11 +109,8 @@ absl::StatusOr StartLongRunningProcess( // Start the process. auto* process_container = new RunningProcessResource(); - auto status = ctx->resource_manager()->Create( - kProcessContainer, absl::StrCat(process_id), process_container); - if (!status.ok()) { - return utils::ToUtilStatus(status); - } + RETURN_IF_ERROR(ctx->resource_manager()->Create( + kProcessContainer, absl::StrCat(process_id), process_container)); process_container->Run(std::move(call)); return process_id; } @@ -115,7 +124,7 @@ absl::StatusOr GetLongRunningProcessStatus( ctx->resource_manager()->Lookup( kProcessContainer, resource_name, &process_container); if (!find_container_status.ok()) { - return utils::ToUtilStatus(find_container_status); + return find_container_status; } auto process_status = process_container->GetStatus(); @@ -124,12 +133,8 @@ absl::StatusOr GetLongRunningProcessStatus( if (!process_status.ok() || process_status.value() == LongRunningProcessStatus::kSuccess) { // Release the process container if the run is done (success or failure). - auto delete_container_status = - ctx->resource_manager()->Delete( - kProcessContainer, resource_name); - if (!find_container_status.ok()) { - return utils::ToUtilStatus(delete_container_status); - } + RETURN_IF_ERROR(ctx->resource_manager()->Delete( + kProcessContainer, resource_name)); } // Return the status. @@ -149,7 +154,7 @@ class SimpleMLCheckStatus : public tensorflow::OpKernel { // Check process status. auto status_or = GetLongRunningProcessStatus(ctx, process_id); if (!status_or.ok()) { - OP_REQUIRES_OK(ctx, utils::FromUtilStatus(status_or.status())); + OP_REQUIRES_OK(ctx, status_or.status()); } // Output process status. diff --git a/tensorflow_decision_forests/tensorflow/ops/training/kernel_on_file.cc b/tensorflow_decision_forests/tensorflow/ops/training/kernel_on_file.cc index 5ed38341..ae18e61e 100644 --- a/tensorflow_decision_forests/tensorflow/ops/training/kernel_on_file.cc +++ b/tensorflow_decision_forests/tensorflow/ops/training/kernel_on_file.cc @@ -19,11 +19,22 @@ // dataset reader is registered). // -#include +#include +#include +#include +#include #include "absl/log/log.h" +#include "absl/status/status.h" +#include "absl/types/optional.h" #include "tensorflow/core/framework/op_kernel.h" +#include "tensorflow/core/framework/op_requires.h" +#include "tensorflow/core/framework/tensor.h" +#include "tensorflow/core/framework/tensor_shape.h" +#include "tensorflow/core/framework/types.h" +#include "tensorflow/core/platform/path.h" #include "tensorflow_decision_forests/tensorflow/ops/training/feature_on_file.h" +#include "tensorflow_decision_forests/tensorflow/ops/training/features.h" #include "tensorflow_decision_forests/tensorflow/ops/training/kernel.h" #include "yggdrasil_decision_forests/dataset/data_spec.h" #include "yggdrasil_decision_forests/dataset/data_spec.pb.h" @@ -34,14 +45,13 @@ #include "yggdrasil_decision_forests/model/decision_tree/decision_forest_interface.h" #include "yggdrasil_decision_forests/model/model_library.h" #include "yggdrasil_decision_forests/utils/logging.h" -#include "yggdrasil_decision_forests/utils/tensorflow.h" +#include "yggdrasil_decision_forests/utils/status_macros.h" namespace tensorflow_decision_forests { namespace ops { namespace tf = ::tensorflow; namespace model = ::yggdrasil_decision_forests::model; -namespace utils = ::yggdrasil_decision_forests::utils; namespace dataset = ::yggdrasil_decision_forests::dataset; REGISTER_KERNEL_BUILDER( @@ -81,25 +91,21 @@ class SimpleMLModelTrainerOnFile : public tensorflow::OpKernel { OP_REQUIRES_OK(ctx, ctx->GetAttr("node_format", &node_format_)); if (model_id_.empty()) { - OP_REQUIRES_OK(ctx, absl::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "Model id is empty")); + OP_REQUIRES_OK(ctx, absl::InvalidArgumentError("Model id is empty")); } std::string serialized_guide; OP_REQUIRES_OK(ctx, ctx->GetAttr("guide", &serialized_guide)); if (!guide_.ParseFromString(serialized_guide)) { - OP_REQUIRES_OK(ctx, absl::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "Cannot de-serialize guide proto.")); + OP_REQUIRES_OK( + ctx, absl::InvalidArgumentError("Cannot de-serialize guide proto.")); } std::string hparams; OP_REQUIRES_OK(ctx, ctx->GetAttr("hparams", &hparams)); if (!hparams_.ParseFromString(hparams)) { - OP_REQUIRES_OK(ctx, absl::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "Cannot de-serialize hparams proto.")); + OP_REQUIRES_OK(ctx, absl::InvalidArgumentError( + "Cannot de-serialize hparams proto.")); } { @@ -107,10 +113,8 @@ class SimpleMLModelTrainerOnFile : public tensorflow::OpKernel { OP_REQUIRES_OK( ctx, ctx->GetAttr("training_config", &serialized_training_config)); if (!training_config_.MergeFromString(serialized_training_config)) { - OP_REQUIRES_OK( - ctx, absl::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "Cannot de-serialize training_config proto.")); + OP_REQUIRES_OK(ctx, absl::InvalidArgumentError( + "Cannot de-serialize training_config proto.")); } } @@ -119,10 +123,9 @@ class SimpleMLModelTrainerOnFile : public tensorflow::OpKernel { OP_REQUIRES_OK(ctx, ctx->GetAttr("deployment_config", &serialized_deployment_config)); if (!deployment_config_.MergeFromString(serialized_deployment_config)) { - OP_REQUIRES_OK( - ctx, absl::Status(static_cast( - absl::StatusCode::kInvalidArgument), - "Cannot de-serialize deployment_config proto.")); + OP_REQUIRES_OK(ctx, + absl::InvalidArgumentError( + "Cannot de-serialize deployment_config proto.")); } } } @@ -134,15 +137,13 @@ class SimpleMLModelTrainerOnFile : public tensorflow::OpKernel { // TODO: Cache the dataspec. dataset::proto::DataSpecification data_spec; - OP_REQUIRES_OK(ctx, utils::FromUtilStatus(dataset::CreateDataSpecWithStatus( - train_dataset_path_, false, guide_, &data_spec))); + OP_REQUIRES_OK(ctx, dataset::CreateDataSpecWithStatus( + train_dataset_path_, false, guide_, &data_spec)); LOG(INFO) << "Dataset:\n" << dataset::PrintHumanReadable(data_spec, false); std::unique_ptr learner; - OP_REQUIRES_OK( - ctx, utils::FromUtilStatus(GetLearner(training_config_, &learner))); - OP_REQUIRES_OK( - ctx, utils::FromUtilStatus(learner->SetHyperParameters(hparams_))); + OP_REQUIRES_OK(ctx, GetLearner(training_config_, &learner)); + OP_REQUIRES_OK(ctx, learner->SetHyperParameters(hparams_)); *learner->mutable_deployment() = deployment_config_; if (!model_dir_.empty()) { learner->set_log_directory(tf::io::JoinPath(model_dir_, "train_logs")); @@ -169,7 +170,7 @@ class SimpleMLModelTrainerOnFile : public tensorflow::OpKernel { // Create a std::function to train the model. // - // Note: The capture of std::function should be copiable. + // Note: The capture of std::function should be copyable. struct TrainingState { std::string model_dir; std::string train_dataset_path; @@ -202,8 +203,7 @@ class SimpleMLModelTrainerOnFile : public tensorflow::OpKernel { training_state->valid_dataset_path); #ifdef TFDF_STOP_TRAINING_ON_INTERRUPT - RETURN_IF_ERROR( - utils::ToUtilStatus(interruption::DisableUserInterruption())); + RETURN_IF_ERROR(interruption::DisableUserInterruption()); #endif RETURN_IF_ERROR(model.status()); @@ -251,7 +251,7 @@ class SimpleMLModelTrainerOnFile : public tensorflow::OpKernel { auto process_id_or = StartLongRunningProcess(ctx, std::move(async_train)); if (!process_id_or.ok()) { - OP_REQUIRES_OK(ctx, utils::FromUtilStatus(process_id_or.status())); + OP_REQUIRES_OK(ctx, process_id_or.status()); } tf::Tensor* output_tensor = nullptr;