Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local kmeans actor scan #8756

Merged
merged 12 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ydb/core/protos/out/out.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <ydb/public/api/protos/ydb_table.pb.h>

#include <ydb/core/protos/blobstorage.pb.h>
#include <ydb/core/protos/blobstorage_vdisk_internal.pb.h>
#include <ydb/core/protos/blobstorage_vdisk_config.pb.h>
Expand Down Expand Up @@ -254,6 +252,10 @@ Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvStatisticsResponse::EStatus, stream, value)
stream << NKikimrStat::TEvStatisticsResponse::EStatus_Name(value);
}

Y_DECLARE_OUT_SPEC(, Ydb::Table::IndexBuildState_State, stream, value) {
stream << IndexBuildState_State_Name(value);
Y_DECLARE_OUT_SPEC(, NKikimrIndexBuilder::EBuildStatus, stream, value) {
stream << NKikimrIndexBuilder::EBuildStatus_Name(value);
}

Y_DECLARE_OUT_SPEC(, NKikimrTxDataShard::TEvLocalKMeansRequest_EState, stream, value) {
stream << NKikimrTxDataShard::TEvLocalKMeansRequest_EState_Name(value);
}
67 changes: 67 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import "ydb/core/protos/subdomains.proto";
import "ydb/core/protos/query_stats.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
import "ydb/public/api/protos/ydb_status_codes.proto";
import "ydb/public/api/protos/ydb_table.proto";
import "ydb/library/yql/dq/actors/protos/dq_events.proto";
import "ydb/library/yql/dq/actors/protos/dq_stats.proto";
import "ydb/library/yql/dq/proto/dq_tasks.proto";
Expand Down Expand Up @@ -1486,6 +1487,72 @@ message TEvSampleKResponse {
repeated bytes Rows = 11;
}

message TEvLocalKMeansRequest {
optional uint64 Id = 1;

optional uint64 TabletId = 2;
optional NKikimrProto.TPathID PathId = 3;

optional uint64 SnapshotTxId = 4;
optional uint64 SnapshotStep = 5;

optional uint64 SeqNoGeneration = 6;
optional uint64 SeqNoRound = 7;

optional Ydb.Table.VectorIndexSettings Settings = 8;

optional uint64 Seed = 9;
optional uint32 K = 10;

enum EState {
azevaykin marked this conversation as resolved.
Show resolved Hide resolved
UNSPECIFIED = 0;
SAMPLE = 1;
KMEANS = 2;
UPLOAD_MAIN_TO_TMP = 3;
UPLOAD_MAIN_TO_POSTING = 4;
UPLOAD_TMP_TO_TMP = 5;
UPLOAD_TMP_TO_POSTING = 6;
DONE = 7;
};
optional EState Upload = 11;
// State != DONE
optional EState State = 12;
// State != KMEANS || DoneRounds < NeedsRounds
optional uint32 DoneRounds = 13;
optional uint32 NeedsRounds = 14;

// id of parent cluster
optional uint32 Parent = 15;
// [Child ... Child + K] ids reserved for our clusters
optional uint32 Child = 16;

optional string LevelName = 17;
optional string PostingName = 18;

optional string EmbeddingColumn = 19;
repeated string DataColumns = 20;
}

message TEvLocalKMeansProgressResponse {
optional uint64 Id = 1;

optional uint64 TabletId = 2;
optional NKikimrProto.TPathID PathId = 3;

optional uint64 RequestSeqNoGeneration = 4;
optional uint64 RequestSeqNoRound = 5;

optional NKikimrIndexBuilder.EBuildStatus Status = 6;
repeated Ydb.Issue.IssueMessage Issues = 7;

// TODO(mbkkt) implement slow-path (reliable-path)
// optional uint64 RowsDelta = 8;
// optional uint64 BytesDelta = 9;

// optional TEvLocalKMeansRequest.EState State = 10;
// optional uint32 DoneRounds = 11;
}

message TEvCdcStreamScanRequest {
message TLimits {
optional uint32 BatchMaxBytes = 1 [default = 512000];
Expand Down
15 changes: 12 additions & 3 deletions ydb/core/tablet_flat/flat_scan_lead.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ namespace NTable {

struct TLead {
void To(TTagsRef tags, TArrayRef<const TCell> key, ESeek seek)
{
To(key, seek);
SetTags(tags);
}

void To(TArrayRef<const TCell> key, ESeek seek)
{
Valid = true;
Tags.assign(tags.begin(), tags.end());
Relation = seek;
Key = TSerializedCellVec(key);
StopKey = { };
Expand All @@ -24,6 +29,10 @@ namespace NTable {
StopKeyInclusive = inclusive;
}

void SetTags(TTagsRef tags) {
Tags.assign(tags.begin(), tags.end());
}

explicit operator bool() const noexcept
{
return Valid;
Expand All @@ -34,12 +43,12 @@ namespace NTable {
Valid = false;
}

bool Valid = false;
ESeek Relation = ESeek::Exact;
bool Valid = false;
azevaykin marked this conversation as resolved.
Show resolved Hide resolved
bool StopKeyInclusive = true;
TVector<ui32> Tags;
TSerializedCellVec Key;
TSerializedCellVec StopKey;
bool StopKeyInclusive = true;
};

}
Expand Down
76 changes: 76 additions & 0 deletions ydb/core/tx/datashard/buffer_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include "ydb/core/scheme/scheme_tablecell.h"
#include "ydb/core/tx/datashard/upload_stats.h"
#include "ydb/core/tx/tx_proxy/upload_rows.h"

namespace NKikimr::NDataShard {

using TTypes = NTxProxy::TUploadTypes;
using TRows = NTxProxy::TUploadRows;
MBkkt marked this conversation as resolved.
Show resolved Hide resolved

class TBufferData: public IStatHolder, public TNonCopyable {
public:
TBufferData()
: Rows{std::make_shared<TRows>()}
{
}

ui64 GetRows() const override final {
return Rows->size();
}

std::shared_ptr<TRows> GetRowsData() const {
return Rows;
}

ui64 GetBytes() const override final {
return ByteSize;
}

void FlushTo(TBufferData& other) {
Y_ABORT_UNLESS(this != &other);
Y_ABORT_UNLESS(other.IsEmpty());
other.Rows.swap(Rows);
other.ByteSize = std::exchange(ByteSize, 0);
azevaykin marked this conversation as resolved.
Show resolved Hide resolved
other.LastKey = std::exchange(LastKey, {});
}

void Clear() {
Rows->clear();
ByteSize = 0;
LastKey = {};
}

void AddRow(TSerializedCellVec&& key, TSerializedCellVec&& targetPk, TString&& targetValue) {
Rows->emplace_back(std::move(targetPk), std::move(targetValue));
ByteSize += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
LastKey = std::move(key);
}

bool IsEmpty() const {
return Rows->empty();
}

size_t Size() const {
return Rows->size();
}

bool IsReachLimits(const TUploadLimits& Limits) {
// TODO(mbkkt) why [0..BatchRowsLimit) but [0..BatchBytesLimit]
return Rows->size() >= Limits.BatchRowsLimit || ByteSize > Limits.BatchBytesLimit;
}

auto&& ExtractLastKey() {
return std::move(LastKey);
}

const auto& GetLastKey() const {
return LastKey;
}

private:
std::shared_ptr<TRows> Rows;
ui64 ByteSize = 0;
TSerializedCellVec LastKey;
};

}
Loading
Loading