Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge stable-24-1-16-analytics into stable-24-1 #5596

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
204d5db
[KIKIMR-21355] Fix OrderedSqlRename without pragma OrderedColumns (#3…
gridnevvvit Apr 18, 2024
6933b5a
Fix ThrottlingException raising in YMQ when doing GetQueueUrl with dr…
niksaveliev Apr 18, 2024
9c79c16
24 1 14 fixes (#3947)
alexnick88 Apr 19, 2024
1e46621
24-1-14-hotfix: Fix reordering of change records (#4087) (#4106)
CyberROFL Apr 25, 2024
27ea4c8
fix(kqp): support stream lookup backward compatibility (#4149)
ulya-sidorina Apr 26, 2024
4ff1942
Fix DataShard BuildStats error handling (#4159)
kunga Apr 27, 2024
994b7a3
Fix leak of PartitionChooserActor (#4171)
nshestakov Apr 27, 2024
5097c74
Fix race between callback and destructor (#1745) (#4224)
azevaykin May 2, 2024
218b0b6
Fix unwanted movements of follower tablets (#4236)
ijon May 2, 2024
0503c3a
fix broken counters in YMQ (#4242)
siarheivesialou May 2, 2024
145154c
Pull out elapsed metrics even if actor still work (#4244)
kruall May 2, 2024
36f1b34
Stable-24-1-analytics fixes (#4475)
ivanmorozov333 May 13, 2024
03949e9
Normalizers for YCL (#4582)
nsofya May 16, 2024
9d5fb19
Fix tables manager memory usage (#4656)
nsofya May 20, 2024
0e94ef9
Remove duplicated schema versions on tables load & Fix read checker (…
nsofya May 20, 2024
d0f2714
Remove invalid checks (#4714)
nsofya May 21, 2024
a0edd99
Fix double portion deletion attempt (#4947) (#4959)
zverevgeny May 29, 2024
05dc81b
fix walking back start of elapsing time (#4578)
kruall May 16, 2024
695cd14
24-1: Mark reenqueued records & forcibly request them (#4597)
CyberROFL May 16, 2024
0ed5591
Merge 24-1: New counter for activations (CurrentActivationTimeUsByAct…
kruall May 29, 2024
1dda217
24-1 Correctly trigger borrow compaction for shadow data (#4978)
kunga Jun 3, 2024
c3fe4d0
Merge commit 'ea2ef6975f' into stable-24-1-16-analytics
zverevgeny Jun 12, 2024
47fb4c0
fix rebase error, remuve dup
zverevgeny Jun 12, 2024
d5b2de6
Make clang-14 happy (#3389)
dcherednik Apr 2, 2024
e353a87
tune muted_ya.txt
zverevgeny Jun 16, 2024
9dc7306
tune muted_ya.txt more
zverevgeny Jun 16, 2024
ffcc7eb
tune muted_ya.txt more2
zverevgeny Jun 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 0 additions & 2 deletions .github/config/muted_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ ydb-core-blobstorage-ut_blobstorage/SpaceCheckForDiskReassign::*
ydb-services-ydb-sdk_sessions_pool_ut/YdbSdkSessionsPool::StressTestSync10
ydb-tests-functional-kqp-kqp_query_session/KqpQuerySession::NoLocalAttach
ydb-core-blobstorage-ut_blobstorage/VDiskAssimilation::Test
ydb-core-tx-columnshard-ut_schema/TColumnShardTestSchema::ForgetAfterFail
ydb-core-tx-columnshard-ut_schema/TColumnShardTestSchema::RebootForgetAfterFail
ydb-library-yql-sql-pg-ut/PgSqlParsingAutoparam::AutoParamValues_DifferentTypes
ydb-core-blobstorage-ut_blobstorage/[6/10]*
ydb/core/blobstorage/ut_blobstorage/Defragmentation::DoesItWork
11 changes: 7 additions & 4 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ ydb/core/kafka_proxy/ut KafkaProtocol.CreatePartitionsScenario
ydb/core/kafka_proxy/ut KafkaProtocol.ProduceScenario
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
ydb/core/kqp/ut/federated_query/generic *
ydb/core/kqp/ut/olap *
ydb/core/kqp/ut/olap KqpOlapAggregations.Json_Exists
ydb/core/kqp/ut/olap KqpOlapIndexes.Indexes
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlap.YqlScriptOltpAndOlap
ydb/core/kqp/ut/pg KqpPg.CreateIndex
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
Expand All @@ -32,9 +38,6 @@ ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpQueryServiceScripts.ForgetScriptExecutionRace
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service [38/50]*
ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.ForgetAfterFail
ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.RebootForgetAfterFail
ydb/core/tx/columnshard/engines/ut *
ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration
ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium
ydb/core/tx/replication/ydb_proxy/ut YdbProxyTests.ReadTopic
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -981,14 +981,16 @@ struct TEvBlobStorage {
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
mutable NLWTrace::TOrbit Orbit;
std::shared_ptr<TExecutionRelay> ExecutionRelay;
const TString StorageId;

TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
, GroupId(groupId)
, ApproximateFreeSpaceShare(approximateFreeSpaceShare)
, StorageId(storageId)
{}

TString Print(bool isFull) const {
Expand Down
18 changes: 12 additions & 6 deletions ydb/core/formats/arrow/arrow_batch_builder.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "arrow_batch_builder.h"
#include "switch/switch_type.h"
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>

namespace NKikimr::NArrow {

namespace {
Expand Down Expand Up @@ -195,12 +195,18 @@ TArrowBatchBuilder::TArrowBatchBuilder(arrow::Compression::type codec, const std
WriteOptions.use_threads = false;
}

bool TArrowBatchBuilder::Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbColumns) {
arrow::Status TArrowBatchBuilder::Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbColumns) {
YdbSchema = ydbColumns;
auto schema = MakeArrowSchema(ydbColumns, NotNullColumns);
auto status = arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), RowsToReserve, &BatchBuilder);
if (!schema.ok()) {
return arrow::Status::FromArgs(schema.status().code(), "Cannot make arrow schema: ", schema.status().ToString());
}
auto status = arrow::RecordBatchBuilder::Make(*schema, arrow::default_memory_pool(), RowsToReserve, &BatchBuilder);
NumRows = NumBytes = 0;
return status.ok();
if (!status.ok()) {
return arrow::Status::FromArgs(schema.status().code(), "Cannot make arrow builder: ", status.ToString());
}
return arrow::Status::OK();
}

void TArrowBatchBuilder::AppendCell(const TCell& cell, ui32 colNum) {
Expand Down Expand Up @@ -259,7 +265,7 @@ void TArrowBatchBuilder::ReserveData(ui32 columnNo, size_t size) {
Y_ABORT_UNLESS(columnNo < YdbSchema.size());
auto type = YdbSchema[columnNo].second;

SwitchYqlTypeToArrowType(type, [&](const auto& type) {
Y_ABORT_UNLESS(SwitchYqlTypeToArrowType(type, [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using TBuilder = typename arrow::TypeTraits<typename TWrap::T>::BuilderType;

Expand All @@ -270,7 +276,7 @@ void TArrowBatchBuilder::ReserveData(ui32 columnNo, size_t size) {
Y_ABORT_UNLESS(status.ok());
}
return true;
});
}));
}

std::shared_ptr<arrow::RecordBatch> TArrowBatchBuilder::FlushBatch(bool reinitialize) {
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/formats/arrow/arrow_batch_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "arrow_helpers.h"
#include <ydb/core/formats/factory.h>
#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/library/conclusion/status.h>

namespace NKikimr::NArrow {

Expand Down Expand Up @@ -155,8 +156,11 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
ui64 maxRowsInBlock, ui64 maxBytesInBlock, TString& err) override {
Y_UNUSED(maxRowsInBlock);
Y_UNUSED(maxBytesInBlock);
Y_UNUSED(err);
return Start(columns);
const auto result = Start(columns);
if (!result.ok()) {
err = result.ToString();
}
return result.ok();
}

void AddRow(const NKikimr::TDbTupleRef& key, const NKikimr::TDbTupleRef& value) override;
Expand All @@ -175,7 +179,7 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
return NumBytes;
}

bool Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns);
arrow::Status Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns);
std::shared_ptr<arrow::RecordBatch> FlushBatch(bool reinitialize);
std::shared_ptr<arrow::RecordBatch> GetBatch() const { return Batch; }

Expand Down
30 changes: 13 additions & 17 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "arrow_filter.h"
#include "switch_type.h"
#include "common/container.h"
#include "common/adapter.h"

#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/chunked_array.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h>
Expand Down Expand Up @@ -307,7 +310,7 @@ NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(const arrow::D
return NArrow::TColumnFilter(std::move(bits));
}

template <arrow::Datum::Kind kindExpected, class TData>
template <class TData>
bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
if (!batch || !batch->num_rows()) {
return false;
Expand All @@ -322,33 +325,26 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
}
}
if (filter.IsTotalDenyFilter()) {
batch = batch->Slice(0, 0);
batch = NAdapter::TDataBuilderPolicy<TData>::GetEmptySame(batch);
return true;
}
if (filter.IsTotalAllowFilter()) {
return true;
}
auto res = arrow::compute::Filter(batch, filter.BuildArrowFilter(batch->num_rows(), startPos, count));
Y_VERIFY_S(res.ok(), res.status().message());
Y_ABORT_UNLESS((*res).kind() == kindExpected);
if constexpr (kindExpected == arrow::Datum::TABLE) {
batch = (*res).table();
return batch->num_rows();
}
if constexpr (kindExpected == arrow::Datum::RECORD_BATCH) {
batch = (*res).record_batch();
return batch->num_rows();
}
AFL_VERIFY(false);
return false;
batch = NAdapter::TDataBuilderPolicy<TData>::ApplyArrowFilter(batch, filter.BuildArrowFilter(batch->num_rows(), startPos, count));
return batch->num_rows();
}

bool TColumnFilter::Apply(std::shared_ptr<TGeneralContainer>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl(*this, batch, startPos, count);
}

bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl<arrow::Datum::TABLE>(*this, batch, startPos, count);
return ApplyImpl(*this, batch, startPos, count);
}

bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl<arrow::Datum::RECORD_BATCH>(*this, batch, startPos, count);
return ApplyImpl(*this, batch, startPos, count);
}

void TColumnFilter::Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/formats/arrow/arrow_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

namespace NKikimr::NArrow {

class TGeneralContainer;

enum class ECompareType {
LESS = 1,
LESS_OR_EQUAL,
Expand Down Expand Up @@ -62,6 +64,10 @@ class TColumnFilter {
return Filter.capacity() * sizeof(ui32) + Count * sizeof(bool);
}

static ui64 GetPredictedMemorySize(const ui32 recordsCount) {
return 2 /* capacity */ * recordsCount * (sizeof(ui32) + sizeof(bool));
}

class TIterator {
private:
i64 InternalPosition = 0;
Expand Down Expand Up @@ -172,6 +178,7 @@ class TColumnFilter {
// It makes a filter using composite predicate
static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);

bool Apply(std::shared_ptr<TGeneralContainer>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
void Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const;
Expand Down
Loading
Loading