Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix errors in chunked ORC writer when no tables were (successfully) written #15393

Merged
merged 8 commits into from
Apr 2, 2024
Merged
8 changes: 0 additions & 8 deletions cpp/include/cudf/io/detail/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,6 @@ class writer {
* @brief Finishes the chunked/streamed write process.
*/
void close();

/**
* @brief Skip work done in `close()`; should be called if `write()` failed.
*
* Calling skip_close() prevents the writer from writing the (invalid) file footer and the
* postscript.
*/
void skip_close();
};
} // namespace orc::detail
} // namespace cudf::io
11 changes: 1 addition & 10 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,16 +436,7 @@ void write_orc(orc_writer_options const& options, rmm::cuda_stream_view stream)

auto writer = std::make_unique<orc::detail::writer>(
std::move(sinks[0]), options, io_detail::single_write_mode::YES, stream);
try {
writer->write(options.get_table());
} catch (...) {
// If an exception is thrown, the output is incomplete/corrupted.
// Make sure the writer will not close with such corrupted data.
// In addition, the writer may throw an exception while trying to close, which would terminate
// the process.
writer->skip_close();
throw;
}
writer->write(options.get_table());
}

/**
Expand Down
29 changes: 15 additions & 14 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -2438,7 +2438,6 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
if (options.get_metadata()) {
_table_meta = std::make_unique<table_input_metadata>(*options.get_metadata());
}
init_state();
}

writer::impl::impl(std::unique_ptr<data_sink> sink,
Expand All @@ -2460,20 +2459,13 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
if (options.get_metadata()) {
_table_meta = std::make_unique<table_input_metadata>(*options.get_metadata());
}
init_state();
}

writer::impl::~impl() { close(); }

void writer::impl::init_state()
{
// Write file header
_out_sink->host_write(MAGIC, std::strlen(MAGIC));
}

void writer::impl::write(table_view const& input)
{
CUDF_EXPECTS(not _closed, "Data has already been flushed to out and closed");
CUDF_EXPECTS(_state != writer_state::CLOSED, "Data has already been flushed to out and closed");

if (not _table_meta) { _table_meta = make_table_meta(input); }

Expand Down Expand Up @@ -2516,6 +2508,11 @@ void writer::impl::write(table_view const& input)
}
}();

if (_state == writer_state::NO_DATA_WRITTEN) {
// Write the ORC file header if this is the first write
_out_sink->host_write(MAGIC, std::strlen(MAGIC));
}

// Compression/encoding were all successful. Now write the intermediate results.
write_orc_data_to_sink(enc_data,
segmentation,
Expand All @@ -2533,6 +2530,8 @@ void writer::impl::write(table_view const& input)

// Update file-level and compression statistics
update_statistics(orc_table.num_rows(), std::move(intermediate_stats), compression_stats);

_state = writer_state::DATA_WRITTEN;
}

void writer::impl::update_statistics(
Expand Down Expand Up @@ -2683,8 +2682,11 @@ void writer::impl::add_table_to_footer_data(orc_table_view const& orc_table,

void writer::impl::close()
{
if (_closed) { return; }
_closed = true;
if (_state != writer_state::DATA_WRITTEN) {
// writer is either closed or no data has been written
_state = writer_state::CLOSED;
return;
Comment on lines +2685 to +2688
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So no more exception throwing right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fixes the exception (i.e. terminate) in close when footer is empty. this happens when no write calls are made with a chunked writer, and when all write calls threw.
Not sure if this answers the question.

}
PostScript ps;

if (_stats_freq != statistics_freq::STATISTICS_NONE) {
Expand Down Expand Up @@ -2769,6 +2771,8 @@ void writer::impl::close()
pbw.put_byte(ps_length);
_out_sink->host_write(pbw.data(), pbw.size());
_out_sink->flush();

_state = writer_state::CLOSED;
}

// Forward to implementation
Expand All @@ -2795,9 +2799,6 @@ writer::~writer() = default;
// Forward to implementation
void writer::write(table_view const& table) { _impl->write(table); }

// Forward to implementation
void writer::skip_close() { _impl->skip_close(); }

// Forward to implementation
void writer::close() { _impl->close(); }

Expand Down
20 changes: 9 additions & 11 deletions cpp/src/io/orc/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ struct encoded_footer_statistics {
std::vector<ColStatsBlob> file_level;
};

enum class writer_state {
NO_DATA_WRITTEN, // No table data has been written to the sink; if the writer is closed or
// destroyed in this state, it should not write the footer.
Comment on lines +231 to +232
Copy link
Contributor

@ttnghia ttnghia Mar 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But does it write the magic? Please clarify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't guarantee that the magic header is not written if the writer throws after the encode, e.g. while writing to the sink. In this case both the writer and the file are in invalid states (and thus we should not try to write the footer). Your strong exception guarantee only covers the encode.

DATA_WRITTEN, // At least one table has been written to the sink; when the writer is closed,
// it should write the footer.
CLOSED // Writer has been closed; no further writes are allowed.
};

/**
* @brief Implementation for ORC writer
*/
Expand Down Expand Up @@ -266,11 +274,6 @@ class writer::impl {
*/
~impl();

/**
* @brief Begins the chunked/streamed write process.
*/
void init_state();

/**
* @brief Writes a single subtable as part of a larger ORC file/table write.
*
Expand All @@ -283,11 +286,6 @@ class writer::impl {
*/
void close();

/**
* @brief Skip writing the footer when closing/deleting the writer.
*/
void skip_close() { _closed = true; }

private:
/**
* @brief Write the intermediate ORC data into the data sink.
Expand Down Expand Up @@ -363,7 +361,7 @@ class writer::impl {
Footer _footer;
Metadata _orc_meta;
persisted_statistics _persisted_stripe_statistics; // Statistics data saved between calls.
bool _closed = false; // To track if the output has been written to sink.
writer_state _state = writer_state::NO_DATA_WRITTEN;
};

} // namespace cudf::io::orc::detail
58 changes: 54 additions & 4 deletions cpp/tests/io/orc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <cudf/concatenate.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/io/data_sink.hpp>
#include <cudf/io/orc.hpp>
#include <cudf/io/orc_metadata.hpp>
#include <cudf/strings/strings_column_view.hpp>
Expand Down Expand Up @@ -2100,8 +2101,7 @@ TEST_F(OrcWriterTest, BounceBufferBug)
auto sequence = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 100; });

constexpr auto num_rows = 150000;
column_wrapper<int8_t, typename decltype(sequence)::value_type> col(sequence,
sequence + num_rows);
column_wrapper<int8_t> col(sequence, sequence + num_rows);
table_view expected({col});

auto filepath = temp_env->get_temp_filepath("BounceBufferBug.orc");
Expand All @@ -2120,8 +2120,7 @@ TEST_F(OrcReaderTest, SizeTypeRowsOverflow)
static_assert(total_rows > std::numeric_limits<cudf::size_type>::max());

auto sequence = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 127; });
column_wrapper<int8_t, typename decltype(sequence)::value_type> col(sequence,
sequence + num_rows);
column_wrapper<int8_t> col(sequence, sequence + num_rows);
table_view chunk_table({col});

std::vector<char> out_buffer;
Expand Down Expand Up @@ -2169,4 +2168,55 @@ TEST_F(OrcReaderTest, SizeTypeRowsOverflow)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected, got_with_stripe_selection->view());
}

TEST_F(OrcChunkedWriterTest, NoWriteCloseNotThrow)
{
std::vector<char> out_buffer;

cudf::io::chunked_orc_writer_options write_opts =
cudf::io::chunked_orc_writer_options::builder(cudf::io::sink_info{&out_buffer});
auto writer = cudf::io::orc_chunked_writer(write_opts);

EXPECT_NO_THROW(writer.close());
}

TEST_F(OrcChunkedWriterTest, FailedWriteCloseNotThrow)
{
// A sink that throws on write()
class throw_sink : public cudf::io::data_sink {
public:
void host_write(void const* data, size_t size) override { throw std::runtime_error("write"); }
void flush() override {}
size_t bytes_written() override { return 0; }
};

auto sequence = thrust::make_counting_iterator(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto sequence = thrust::make_counting_iterator(0);
auto const sequence = thrust::make_counting_iterator(0);

column_wrapper<int8_t> col(sequence, sequence + 10);
table_view table({col});

throw_sink sink;
cudf::io::chunked_orc_writer_options write_opts =
cudf::io::chunked_orc_writer_options::builder(cudf::io::sink_info{&sink});
auto writer = cudf::io::orc_chunked_writer(write_opts);

try {
writer.write(table);
} catch (...) {
// ignore the exception; we're testing that close() doesn't throw when the only write() fails
}

EXPECT_NO_THROW(writer.close());
}

TEST_F(OrcChunkedWriterTest, NoDataInSinkWhenNoWrite)
{
std::vector<char> out_buffer;

cudf::io::chunked_orc_writer_options write_opts =
cudf::io::chunked_orc_writer_options::builder(cudf::io::sink_info{&out_buffer});
auto writer = cudf::io::orc_chunked_writer(write_opts);

EXPECT_NO_THROW(writer.close());
EXPECT_EQ(out_buffer.size(), 0);
}

CUDF_TEST_PROGRAM_MAIN()
Loading