Skip to content

Commit

Permalink
Merge branch 'master' into fly_0826_reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
liyafan82 committed Aug 28, 2019
2 parents a1f7046 + d4d4a12 commit 53c1e0b
Show file tree
Hide file tree
Showing 106 changed files with 2,225 additions and 444 deletions.
10 changes: 5 additions & 5 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -669,12 +669,12 @@ if(ARROW_STATIC_LINK_LIBS)
add_dependencies(arrow_dependencies ${ARROW_STATIC_LINK_LIBS})
endif()

set(ARROW_SHARED_PRIVATE_LINK_LIBS ${ARROW_STATIC_LINK_LIBS} ${BOOST_SYSTEM_LIBRARY}
${BOOST_FILESYSTEM_LIBRARY})
set(ARROW_SHARED_PRIVATE_LINK_LIBS ${ARROW_STATIC_LINK_LIBS} ${BOOST_FILESYSTEM_LIBRARY}
${BOOST_SYSTEM_LIBRARY})

list(APPEND ARROW_STATIC_LINK_LIBS ${BOOST_SYSTEM_LIBRARY} ${BOOST_FILESYSTEM_LIBRARY})
list(APPEND ARROW_STATIC_LINK_LIBS ${BOOST_FILESYSTEM_LIBRARY} ${BOOST_SYSTEM_LIBRARY})

list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS boost_system boost_filesystem boost_regex)
list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS boost_filesystem boost_system boost_regex)

if(NOT MSVC)
list(APPEND ARROW_LINK_LIBS ${CMAKE_DL_LIBS})
Expand All @@ -701,8 +701,8 @@ set(ARROW_TEST_SHARED_LINK_LIBS
arrow_shared
${ARROW_LINK_LIBS}
${double-conversion_LIBRARIES}
${BOOST_SYSTEM_LIBRARY}
${BOOST_FILESYSTEM_LIBRARY}
${BOOST_SYSTEM_LIBRARY}
${ARROW_TEST_LINK_TOOLCHAIN})

if(NOT MSVC)
Expand Down
123 changes: 99 additions & 24 deletions cpp/src/arrow/compare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1008,38 +1008,39 @@ bool ArrayRangeEquals(const Array& left, const Array& right, int64_t left_start_
return are_equal;
}

bool StridedTensorContentEquals(int dim_index, int64_t left_offset, int64_t right_offset,
int elem_size, const Tensor& left, const Tensor& right) {
namespace {

bool StridedIntegerTensorContentEquals(const int dim_index, int64_t left_offset,
int64_t right_offset, int elem_size,
const Tensor& left, const Tensor& right) {
const auto n = left.shape()[dim_index];
const auto left_stride = left.strides()[dim_index];
const auto right_stride = right.strides()[dim_index];
if (dim_index == left.ndim() - 1) {
for (int64_t i = 0; i < left.shape()[dim_index]; ++i) {
if (memcmp(left.raw_data() + left_offset + i * left.strides()[dim_index],
right.raw_data() + right_offset + i * right.strides()[dim_index],
elem_size) != 0) {
for (int64_t i = 0; i < n; ++i) {
if (memcmp(left.raw_data() + left_offset + i * left_stride,
right.raw_data() + right_offset + i * right_stride, elem_size) != 0) {
return false;
}
}
return true;
}
for (int64_t i = 0; i < left.shape()[dim_index]; ++i) {
if (!StridedTensorContentEquals(dim_index + 1, left_offset, right_offset, elem_size,
left, right)) {
for (int64_t i = 0; i < n; ++i) {
if (!StridedIntegerTensorContentEquals(dim_index + 1, left_offset, right_offset,
elem_size, left, right)) {
return false;
}
left_offset += left.strides()[dim_index];
right_offset += right.strides()[dim_index];
left_offset += left_stride;
right_offset += right_stride;
}
return true;
}

bool TensorEquals(const Tensor& left, const Tensor& right) {
bool IntegerTensorEquals(const Tensor& left, const Tensor& right) {
bool are_equal;
// The arrays are the same object
if (&left == &right) {
are_equal = true;
} else if (left.type_id() != right.type_id()) {
are_equal = false;
} else if (left.size() == 0) {
are_equal = true;
} else {
const bool left_row_major_p = left.is_row_major();
const bool left_column_major_p = left.is_column_major();
Expand All @@ -1048,14 +1049,9 @@ bool TensorEquals(const Tensor& left, const Tensor& right) {

if (!(left_row_major_p && right_row_major_p) &&
!(left_column_major_p && right_column_major_p)) {
const auto& shape = left.shape();
if (shape != right.shape()) {
are_equal = false;
} else {
const auto& type = checked_cast<const FixedWidthType&>(*left.type());
are_equal =
StridedTensorContentEquals(0, 0, 0, type.bit_width() / 8, left, right);
}
const auto& type = checked_cast<const FixedWidthType&>(*left.type());
are_equal =
StridedIntegerTensorContentEquals(0, 0, 0, type.bit_width() / 8, left, right);
} else {
const auto& size_meta = checked_cast<const FixedWidthType&>(*left.type());
const int byte_width = size_meta.bit_width() / CHAR_BIT;
Expand All @@ -1071,6 +1067,85 @@ bool TensorEquals(const Tensor& left, const Tensor& right) {
return are_equal;
}

template <typename DataType>
bool StridedFloatTensorContentEquals(const int dim_index, int64_t left_offset,
int64_t right_offset, const Tensor& left,
const Tensor& right, const EqualOptions& opts) {
using c_type = typename DataType::c_type;
const auto n = left.shape()[dim_index];
const auto left_stride = left.strides()[dim_index];
const auto right_stride = right.strides()[dim_index];
if (dim_index == left.ndim() - 1) {
auto left_data = left.raw_data();
auto right_data = right.raw_data();
if (opts.nans_equal()) {
for (int64_t i = 0; i < n; ++i) {
c_type left_value =
*reinterpret_cast<const c_type*>(left_data + left_offset + i * left_stride);
c_type right_value = *reinterpret_cast<const c_type*>(right_data + right_offset +
i * right_stride);
if (!(left_value == right_value ||
(std::isnan(left_value) && std::isnan(right_value)))) {
return false;
}
}
} else {
for (int64_t i = 0; i < n; ++i) {
c_type left_value =
*reinterpret_cast<const c_type*>(left_data + left_offset + i * left_stride);
c_type right_value = *reinterpret_cast<const c_type*>(right_data + right_offset +
i * right_stride);
if (left_value != right_value) {
return false;
}
}
}
return true;
}
for (int64_t i = 0; i < n; ++i) {
if (!StridedFloatTensorContentEquals<DataType>(dim_index + 1, left_offset,
right_offset, left, right, opts)) {
return false;
}
left_offset += left_stride;
right_offset += right_stride;
}
return true;
}

template <typename DataType>
bool FloatTensorEquals(const Tensor& left, const Tensor& right,
const EqualOptions& opts) {
static_assert(std::is_floating_point<typename DataType::c_type>::value,
"DataType must be a floating point type");
return StridedFloatTensorContentEquals<DataType>(0, 0, 0, left, right, opts);
}

} // namespace

bool TensorEquals(const Tensor& left, const Tensor& right, const EqualOptions& opts) {
if (left.type_id() != right.type_id()) {
return false;
} else if (left.size() == 0 && right.size() == 0) {
return true;
} else if (left.shape() != right.shape()) {
return false;
}

switch (left.type_id()) {
// TODO: Support half-float tensors
// case Type::HALF_FLOAT:
case Type::FLOAT:
return FloatTensorEquals<FloatType>(left, right, opts);

case Type::DOUBLE:
return FloatTensorEquals<DoubleType>(left, right, opts);

default:
return IntegerTensorEquals(left, right);
}
}

namespace {

template <typename LeftSparseIndexType, typename RightSparseIndexType>
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compare.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class EqualOptions {
bool ARROW_EXPORT ArrayEquals(const Array& left, const Array& right,
const EqualOptions& = EqualOptions::Defaults());

bool ARROW_EXPORT TensorEquals(const Tensor& left, const Tensor& right);
bool ARROW_EXPORT TensorEquals(const Tensor& left, const Tensor& right,
const EqualOptions& = EqualOptions::Defaults());

/// EXPERIMENTAL: Returns true if the given sparse tensors are exactly equal
bool ARROW_EXPORT SparseTensorEquals(const SparseTensor& left, const SparseTensor& right);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Status SimpleDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
auto it = MakeIterator(record_batches_);
auto it = MakeVectorIterator(record_batches_);

// RecordBatch -> ScanTask
auto fn = [](std::shared_ptr<RecordBatch> batch) -> std::unique_ptr<ScanTask> {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource {

std::unique_ptr<DataFragmentIterator> GetFragments(
std::shared_ptr<ScanOptions> options) override {
return MakeIterator(fragments_);
return MakeVectorIterator(fragments_);
}

std::string type() const override { return "simple_data_source"; }
Expand Down
67 changes: 67 additions & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

#include "arrow/dataset/file_base.h"

#include <algorithm>
#include <vector>

#include "arrow/filesystem/filesystem.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/util/stl.h"

namespace arrow {
namespace dataset {
Expand All @@ -41,5 +45,68 @@ Status FileBasedDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,
return format_->ScanFile(source_, scan_options_, scan_context, out);
}

FileSystemBasedDataSource::FileSystemBasedDataSource(
fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format, std::shared_ptr<ScanOptions> scan_options,
std::vector<fs::FileStats> stats)
: filesystem_(filesystem),
selector_(std::move(selector)),
format_(std::move(format)),
scan_options_(std::move(scan_options)),
stats_(std::move(stats)) {}

Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::unique_ptr<FileSystemBasedDataSource>* out) {
std::vector<fs::FileStats> stats;
RETURN_NOT_OK(filesystem->GetTargetStats(selector, &stats));

auto new_end =
std::remove_if(stats.begin(), stats.end(), [&](const fs::FileStats& stats) {
return stats.type() != fs::FileType::File ||
!format->IsKnownExtension(stats.extension());
});
stats.resize(new_end - stats.begin());

out->reset(new FileSystemBasedDataSource(filesystem, selector, std::move(format),
std::move(scan_options), std::move(stats)));
return Status::OK();
}

std::unique_ptr<DataFragmentIterator> FileSystemBasedDataSource::GetFragments(
std::shared_ptr<ScanOptions> options) {
struct Impl : DataFragmentIterator {
Impl(fs::FileSystem* filesystem, std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options, std::vector<fs::FileStats> stats)
: filesystem_(filesystem),
format_(std::move(format)),
scan_options_(std::move(scan_options)),
stats_(std::move(stats)) {}

Status Next(std::shared_ptr<DataFragment>* out) {
if (i_ == stats_.size()) {
*out = nullptr;
return Status::OK();
}
FileSource src(stats_[i_++].path(), filesystem_);

std::unique_ptr<DataFragment> fragment;
RETURN_NOT_OK(format_->MakeFragment(src, scan_options_, &fragment));
*out = std::move(fragment);
return Status::OK();
}

size_t i_ = 0;
fs::FileSystem* filesystem_;
std::shared_ptr<FileFormat> format_;
std::shared_ptr<ScanOptions> scan_options_;
std::vector<fs::FileStats> stats_;
};

return internal::make_unique<Impl>(filesystem_, format_, options, stats_);
}

} // namespace dataset
} // namespace arrow
39 changes: 39 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "arrow/buffer.h"
#include "arrow/dataset/dataset.h"
#include "arrow/dataset/scanner.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/dataset/writer.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/io/file.h"
#include "arrow/util/compression.h"

Expand Down Expand Up @@ -130,6 +132,11 @@ class ARROW_DS_EXPORT FileFormat {
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) const = 0;

/// \brief Open a fragment
virtual Status MakeFragment(const FileSource& location,
std::shared_ptr<ScanOptions> opts,
std::unique_ptr<DataFragment>* out) = 0;
};

/// \brief A DataFragment that is stored in a file with a known format
Expand All @@ -147,11 +154,43 @@ class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment {
const FileSource& source() const { return source_; }
std::shared_ptr<FileFormat> format() const { return format_; }

std::shared_ptr<ScanOptions> scan_options() const override { return scan_options_; }

protected:
FileSource source_;
std::shared_ptr<FileFormat> format_;
std::shared_ptr<ScanOptions> scan_options_;
};

/// \brief A DataSource which takes files of one format from a directory
///
/// The directory is crawled upon construction (Make) and not updated afterward.
/// GetFragments() will not include files added after this DataDource is constructed and
/// will error if files are deleted/moved.
class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource {
public:
static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::unique_ptr<FileSystemBasedDataSource>* out);

std::string type() const override { return "directory"; }

std::unique_ptr<DataFragmentIterator> GetFragments(
std::shared_ptr<ScanOptions> options) override;

protected:
FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::vector<fs::FileStats> stats);

fs::FileSystem* filesystem_ = NULLPTR;
fs::Selector selector_;
std::shared_ptr<FileFormat> format_;
std::shared_ptr<ScanOptions> scan_options_;
std::vector<fs::FileStats> stats_;
};

} // namespace dataset
} // namespace arrow
9 changes: 9 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "arrow/table.h"
#include "arrow/util/iterator.h"
#include "arrow/util/range.h"
#include "arrow/util/stl.h"
#include "parquet/arrow/reader.h"
#include "parquet/file_reader.h"

Expand Down Expand Up @@ -174,5 +175,13 @@ Status ParquetFileFormat::ScanFile(const FileSource& source,
out);
}

Status ParquetFileFormat::MakeFragment(const FileSource& source,
std::shared_ptr<ScanOptions> opts,
std::unique_ptr<DataFragment>* out) {
// TODO(bkietz) check location.path() against IsKnownExtension etc
*out = internal::make_unique<ParquetFragment>(source, opts);
return Status::OK();
}

} // namespace dataset
} // namespace arrow
Loading

0 comments on commit 53c1e0b

Please sign in to comment.