Skip to content

Commit

Permalink
Repro done
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed Jun 13, 2024
1 parent b24b561 commit b8395d0
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,10 @@ TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) {
*ArrayFromJSON(partition_field->type(), R"(["one"])"));
}

namespace fs = arrow::fs;
namespace ds = arrow::dataset;
namespace ac = arrow::acero;
namespace cp = arrow::compute;
namespace ds = arrow::dataset;
namespace fs = arrow::fs;

arrow::Result<std::shared_ptr<fs::FileSystem>> GetFileSystemFromUri(
const std::string& uri, std::string* path) {
Expand All @@ -820,6 +821,7 @@ arrow::Result<std::shared_ptr<ds::Dataset>> GetDatasetFromDirectory(
s.recursive = true;

ds::FileSystemFactoryOptions options;
options.partitioning = DirectoryPartitioning::MakeFactory({"year", "month"});
// The factory will try to build a child dataset.
ARROW_ASSIGN_OR_RAISE(auto factory,
ds::FileSystemDatasetFactory::Make(fs, s, format, options));
Expand All @@ -845,18 +847,36 @@ arrow::Result<std::shared_ptr<ds::Scanner>> GetScannerFromDataset(
return scanner_builder->Finish();
}

arrow::Status ExecutePlanAndCollectAsTable(ac::Declaration plan) {
// collect sink_reader into a Table
std::shared_ptr<arrow::Table> response_table;
ARROW_ASSIGN_OR_RAISE(response_table, ac::DeclarationToTable(std::move(plan)));

std::cout << "Results : " << response_table->ToString() << std::endl;

return arrow::Status::OK();
}

TEST(GH41813, GH41813) {
std::string uri =
"file:///Users/zanmato/Downloads/arrow_segfault_reproducer_2/data/reduced_attempt3";
std::string path;
auto format = std::make_shared<ds::ParquetFileFormat>();
ASSERT_OK_AND_ASSIGN(auto fs, GetFileSystemFromUri(uri, &path));
ASSERT_OK_AND_ASSIGN(auto dataset, GetDatasetFromDirectory(fs, format, path));

ASSERT_OK_AND_ASSIGN(auto scanner, GetScannerFromDataset(dataset));

ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
std::cout << "Table size: " << table->num_rows() << "\n";
auto scan_options = std::make_shared<arrow::dataset::ScanOptions>();
scan_options->projection = cp::project({}, {}); // create empty projection
auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, scan_options};
ac::Declaration scan{"scan", std::move(scan_node_options)};

auto count_options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
auto aggregate_options = ac::AggregateNodeOptions{
/*aggregates=*/{{"hash_count", count_options, "date", "count(date)"}},
/*keys=*/{"year", "month", "cid"}};
ac::Declaration aggregate{"aggregate", {std::move(scan)}, std::move(aggregate_options)};

ASSERT_OK(ExecutePlanAndCollectAsTable(std::move(aggregate)));
}

} // namespace dataset
Expand Down

0 comments on commit b8395d0

Please sign in to comment.