Skip to content

Commit

Permalink
Revert "Stable-24-1 patch for cs (ydb-platform#2142)"
Browse files Browse the repository at this point in the history
This reverts commit 288a80b.
  • Loading branch information
ivanmorozov333 committed Apr 16, 2024
1 parent e60ad02 commit 4b86e87
Show file tree
Hide file tree
Showing 141 changed files with 1,035 additions and 1,262 deletions.
10 changes: 1 addition & 9 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter
++itExt;
}
}
AFL_VERIFY(itSelf == Filter.end() && itExt == extFilter.Filter.cend());
Y_ABORT_UNLESS(itSelf == Filter.end() && itExt == extFilter.Filter.cend());
TColumnFilter result = TColumnFilter::BuildAllowFilter();
std::swap(resultFilter, result.Filter);
std::swap(curCurrent, result.LastValue);
Expand Down Expand Up @@ -611,12 +611,4 @@ std::optional<ui32> TColumnFilter::GetFilteredCount() const {
return *FilteredCount;
}

void TColumnFilter::Append(const TColumnFilter& filter) {
bool currentVal = filter.GetStartValue();
for (auto&& i : filter.Filter) {
Add(currentVal, i);
currentVal = !currentVal;
}
}

}
1 change: 0 additions & 1 deletion ydb/core/formats/arrow/arrow_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class TColumnFilter {
FilteredCount.reset();
}
public:
void Append(const TColumnFilter& filter);
void Add(const bool value, const ui32 count = 1);
std::optional<ui32> GetFilteredCount() const;
const std::vector<bool>& BuildSimpleFilter() const;
Expand Down
37 changes: 3 additions & 34 deletions ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "common/validation.h"
#include "merging_sorted_input_stream.h"
#include "permutations.h"
#include "serializer/native.h"
#include "serializer/batch_only.h"
#include "serializer/abstract.h"
#include "serializer/stream.h"
#include "simple_arrays_cache.h"
Expand Down Expand Up @@ -106,7 +106,7 @@ std::shared_ptr<arrow::Schema> DeserializeSchema(const TString& str) {
}

TString SerializeBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const arrow::ipc::IpcWriteOptions& options) {
return NSerialization::TNativeSerializer(options).SerializePayload(batch);
return NSerialization::TBatchPayloadSerializer(options).Serialize(batch);
}

TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& batch) {
Expand All @@ -117,7 +117,7 @@ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& b

std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob, const std::shared_ptr<arrow::Schema>& schema)
{
auto result = NSerialization::TNativeSerializer().Deserialize(blob, schema);
auto result = NSerialization::TBatchPayloadDeserializer(schema).Deserialize(blob);
if (result.ok()) {
return *result;
} else {
Expand Down Expand Up @@ -977,35 +977,4 @@ std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_p
return arrow::RecordBatch::Make(schema, *recordsCount, columns);
}

std::vector<std::shared_ptr<arrow::RecordBatch>> SliceToRecordBatches(const std::shared_ptr<arrow::Table>& t) {
std::set<ui32> splitPositions;
const ui32 numRows = t->num_rows();
for (auto&& i : t->columns()) {
ui32 pos = 0;
for (auto&& arr : i->chunks()) {
splitPositions.emplace(pos);
pos += arr->length();
}
AFL_VERIFY(pos == t->num_rows());
}
std::vector<std::vector<std::shared_ptr<arrow::Array>>> slicedData;
slicedData.resize(splitPositions.size());
std::vector<ui32> positions(splitPositions.begin(), splitPositions.end());
for (auto&& i : t->columns()) {
for (ui32 idx = 0; idx < positions.size(); ++idx) {
auto slice = i->Slice(positions[idx], ((idx + 1 == positions.size()) ? numRows : positions[idx + 1]) - positions[idx]);
AFL_VERIFY(slice->num_chunks() == 1);
slicedData[idx].emplace_back(slice->chunks().front());
}
}
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
ui32 count = 0;
for (auto&& i : slicedData) {
result.emplace_back(arrow::RecordBatch::Make(t->schema(), i.front()->length(), i));
count += result.back()->num_rows();
}
AFL_VERIFY(count == t->num_rows())("count", count)("t", t->num_rows());
return result;
}

}
2 changes: 0 additions & 2 deletions ydb/core/formats/arrow/arrow_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) {
return column->null_bitmap_data();
}

std::vector<std::shared_ptr<arrow::RecordBatch>> SliceToRecordBatches(const std::shared_ptr<arrow::Table>& t);

bool ArrayScalarsEqual(const std::shared_ptr<arrow::Array>& lhs, const std::shared_ptr<arrow::Array>& rhs);
std::shared_ptr<arrow::Array> BoolVecToArray(const std::vector<bool>& vec);

Expand Down
23 changes: 23 additions & 0 deletions ydb/core/formats/arrow/compression/CMakeLists.darwin-arm64.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

# This file was generated by the build system used internally in the Yandex monorepo.
# Only simple modifications are allowed (adding source-files to targets, adding simple properties
# like target_include_directories). These modifications will be ported to original
# ya.make files by maintainers. Any complex modifications which can't be ported back to the
# original buildsystem will not be accepted.



add_library(formats-arrow-compression)
target_link_libraries(formats-arrow-compression PUBLIC
contrib-libs-cxxsupp
yutil
libs-apache-arrow
ydb-core-protos
core-formats-arrow
ydb-library-conclusion
)
target_sources(formats-arrow-compression PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp
)
23 changes: 23 additions & 0 deletions ydb/core/formats/arrow/compression/CMakeLists.darwin-x86_64.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

# This file was generated by the build system used internally in the Yandex monorepo.
# Only simple modifications are allowed (adding source-files to targets, adding simple properties
# like target_include_directories). These modifications will be ported to original
# ya.make files by maintainers. Any complex modifications which can't be ported back to the
# original buildsystem will not be accepted.



add_library(formats-arrow-compression)
target_link_libraries(formats-arrow-compression PUBLIC
contrib-libs-cxxsupp
yutil
libs-apache-arrow
ydb-core-protos
core-formats-arrow
ydb-library-conclusion
)
target_sources(formats-arrow-compression PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp
)
24 changes: 24 additions & 0 deletions ydb/core/formats/arrow/compression/CMakeLists.linux-aarch64.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

# This file was generated by the build system used internally in the Yandex monorepo.
# Only simple modifications are allowed (adding source-files to targets, adding simple properties
# like target_include_directories). These modifications will be ported to original
# ya.make files by maintainers. Any complex modifications which can't be ported back to the
# original buildsystem will not be accepted.



add_library(formats-arrow-compression)
target_link_libraries(formats-arrow-compression PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
libs-apache-arrow
ydb-core-protos
core-formats-arrow
ydb-library-conclusion
)
target_sources(formats-arrow-compression PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp
)
24 changes: 24 additions & 0 deletions ydb/core/formats/arrow/compression/CMakeLists.linux-x86_64.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

# This file was generated by the build system used internally in the Yandex monorepo.
# Only simple modifications are allowed (adding source-files to targets, adding simple properties
# like target_include_directories). These modifications will be ported to original
# ya.make files by maintainers. Any complex modifications which can't be ported back to the
# original buildsystem will not be accepted.



add_library(formats-arrow-compression)
target_link_libraries(formats-arrow-compression PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
libs-apache-arrow
ydb-core-protos
core-formats-arrow
ydb-library-conclusion
)
target_sources(formats-arrow-compression PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp
)
19 changes: 19 additions & 0 deletions ydb/core/formats/arrow/compression/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

# This file was generated by the build system used internally in the Yandex monorepo.
# Only simple modifications are allowed (adding source-files to targets, adding simple properties
# like target_include_directories). These modifications will be ported to original
# ya.make files by maintainers. Any complex modifications which can't be ported back to the
# original buildsystem will not be accepted.


if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
include(CMakeLists.linux-x86_64.txt)
elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
include(CMakeLists.linux-aarch64.txt)
elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
include(CMakeLists.darwin-x86_64.txt)
elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64")
include(CMakeLists.darwin-arm64.txt)
elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
include(CMakeLists.windows-x86_64.txt)
endif()
23 changes: 23 additions & 0 deletions ydb/core/formats/arrow/compression/CMakeLists.windows-x86_64.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

# This file was generated by the build system used internally in the Yandex monorepo.
# Only simple modifications are allowed (adding source-files to targets, adding simple properties
# like target_include_directories). These modifications will be ported to original
# ya.make files by maintainers. Any complex modifications which can't be ported back to the
# original buildsystem will not be accepted.



add_library(formats-arrow-compression)
target_link_libraries(formats-arrow-compression PUBLIC
contrib-libs-cxxsupp
yutil
libs-apache-arrow
ydb-core-protos
core-formats-arrow
ydb-library-conclusion
)
target_sources(formats-arrow-compression PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp
)
77 changes: 77 additions & 0 deletions ydb/core/formats/arrow/compression/diff.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include "diff.h"
#include "object.h"
#include "parsing.h"
#include <util/string/cast.h>

namespace NKikimr::NArrow {

NKikimrSchemeOp::TCompressionOptions TCompressionDiff::SerializeToProto() const {
NKikimrSchemeOp::TCompressionOptions result;
if (Level) {
result.SetCompressionLevel(*Level);
}
if (Codec) {
result.SetCompressionCodec(CompressionToProto(*Codec));
}
return result;
}

TConclusionStatus TCompressionDiff::DeserializeFromRequestFeatures(NYql::TFeaturesExtractor& features) {
{
auto fValue = features.Extract("COMPRESSION.TYPE");
if (fValue) {
Codec = NArrow::CompressionFromString(*fValue);
if (!Codec) {
return TConclusionStatus::Fail("cannot parse COMPRESSION.TYPE as arrow::Compression");
}
}
}
{
auto fValue = features.Extract("COMPRESSION.LEVEL");
if (fValue) {
ui32 level;
if (!TryFromString<ui32>(*fValue, level)) {
return TConclusionStatus::Fail("cannot parse COMPRESSION.LEVEL as ui32");
}
Level = level;
}
}
return TConclusionStatus::Success();
}

bool TCompressionDiff::DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto) {
if (proto.HasCompressionLevel()) {
Level = proto.GetCompressionLevel();
}
if (proto.HasCompressionCodec()) {
Codec = CompressionFromProto(proto.GetCompressionCodec());
if (!Codec) {
return false;
}
}
return true;
}

NKikimr::TConclusionStatus TCompressionDiff::Apply(std::optional<TCompression>& settings) const {
if (IsEmpty()) {
return TConclusionStatus::Success();
}
TCompression merged;
if (!!settings) {
merged = *settings;
}
if (Codec) {
merged.Codec = *Codec;
}
if (Level) {
merged.Level = *Level;
}
auto validation = merged.Validate();
if (!validation) {
return validation;
}
settings = merged;
return TConclusionStatus::Success();
}

}
34 changes: 34 additions & 0 deletions ydb/core/formats/arrow/compression/diff.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include <ydb/library/conclusion/status.h>
#include <ydb/library/conclusion/result.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/services/metadata/abstract/request_features.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
#include <optional>
#include <map>

namespace NKikimr::NArrow {

class TCompression;

class TCompressionDiff {
private:
std::optional<arrow::Compression::type> Codec;
std::optional<int> Level;
bool IsEmpty() const {
return !Level && !Codec;
}
public:
NKikimrSchemeOp::TCompressionOptions SerializeToProto() const;
bool DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto);
TConclusionStatus DeserializeFromRequestFeatures(NYql::TFeaturesExtractor& features);
const std::optional<arrow::Compression::type>& GetCodec() const {
return Codec;
}
const std::optional<int>& GetLevel() const {
return Level;
}
TConclusionStatus Apply(std::optional<TCompression>& settings) const;
};
}
Loading

0 comments on commit 4b86e87

Please sign in to comment.