Skip to content

Commit

Permalink
Merge tag 'tags/24.3.10' into stream-nb-24-3
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik committed Oct 18, 2024
2 parents 1214afb + 41ca9ba commit 645139e
Show file tree
Hide file tree
Showing 1,787 changed files with 53,582 additions and 28,893 deletions.
20 changes: 12 additions & 8 deletions .github/actions/s3cmd/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ inputs:
required: true
description: "s3 key secret"
s3_bucket:
required: true
required: false
description: "s3 bucket"
s3_endpoint:
required: true
description: "s3 endpoint"
folder_prefix:
required: true
required: false
description: "folder prefix"
build_preset:
required: true
required: false
description: "build preset like relwithdebinfo"
runs:
using: "composite"
Expand All @@ -35,7 +35,14 @@ runs:
host_base = storage.yandexcloud.net
host_bucket = %(bucket)s.storage.yandexcloud.net
EOF
env:
s3_key_id: ${{ inputs.s3_key_id }}
s3_secret_access_key: ${{ inputs.s3_key_secret }}

- name: export s3 path variables
shell: bash
if: inputs.build_preset
run: |
folder="${{ runner.arch == 'X64' && 'x86-64' || runner.arch == 'ARM64' && 'arm64' || 'unknown' }}"
BUILD_PRESET="${{ inputs.build_preset }}"
Expand All @@ -57,7 +64,4 @@ runs:
echo "S3_BUCKET_PATH=s3://${{ inputs.s3_bucket }}/${{ github.repository }}/${{github.workflow}}/${{ github.run_id }}/${{ inputs.folder_prefix }}${folder}" >> $GITHUB_ENV
echo "S3_URL_PREFIX=${{ inputs.s3_endpoint }}/${{ inputs.s3_bucket }}/${{ github.repository }}/${{ github.workflow }}/${{ github.run_id }}/${{ inputs.folder_prefix }}${folder}" >> $GITHUB_ENV
echo "S3_TEST_ARTIFACTS_BUCKET_PATH=s3://${{ inputs.s3_bucket }}/testing_out_stuff/${{ github.repository }}/${{github.workflow}}/${{ github.run_id }}/${{ inputs.folder_prefix }}${folder}" >> $GITHUB_ENV
echo "S3_TEST_ARTIFACTS_URL_PREFIX=${{ inputs.s3_endpoint }}/${{ inputs.s3_bucket }}/testing_out_stuff/${{ github.repository }}/${{ github.workflow }}/${{ github.run_id }}/${{ inputs.folder_prefix }}${folder}" >> $GITHUB_ENV
env:
s3_key_id: ${{ inputs.s3_key_id }}
s3_secret_access_key: ${{ inputs.s3_key_secret }}
echo "S3_TEST_ARTIFACTS_URL_PREFIX=${{ inputs.s3_endpoint }}/${{ inputs.s3_bucket }}/testing_out_stuff/${{ github.repository }}/${{ github.workflow }}/${{ github.run_id }}/${{ inputs.folder_prefix }}${folder}" >> $GITHUB_ENV
14 changes: 7 additions & 7 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,15 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.*
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL
ydb/core/kqp/ut/olap KqpOlapWrite.WriteDeleteCleanGC
ydb/core/kqp/ut/pg KqpPg.CreateIndex
ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssueInteractiveTx+withSink
ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssue+withSink
ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink
ydb/core/kqp/ut/tx KqpSinkTx.InvalidateOnError
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_GenericQuerys
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_StreamGenericQuery
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_UsesGenericQueryOnJoinWithDataShardTable
ydb/core/kqp/ut/scheme KqpOlapScheme.DropTable
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter
ydb/core/kqp/ut/scheme [14/50]*
Expand All @@ -31,6 +29,7 @@ ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service [38/50]*
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpUpdate
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpReplace+HasSecondaryIndex
ydb/core/persqueue/ut [37/40] chunk chunk
ydb/core/persqueue/ut [38/40] chunk chunk
ydb/core/persqueue/ut TPQTest.*DirectRead*
Expand Down Expand Up @@ -97,7 +96,6 @@ ydb/tests/fq/yds *
ydb/tests/fq/control_plane_storage *
ydb/tests/functional/audit *
ydb/tests/functional/blobstorage test_replication.py.TestReplicationAfterNodesRestart.test_replication*
ydb/tests/functional/clickbench test.py.test_plans[column]
ydb/tests/functional/kqp/kqp_indexes ConsistentIndexRead.InteractiveTx
ydb/tests/functional/kqp/kqp_query_session KqpQuerySession.NoLocalAttach
ydb/tests/functional/restarts test_restarts.py.*
Expand All @@ -107,4 +105,6 @@ ydb/tests/functional/tenants test_storage_config.py.TestStorageConfig.*
ydb/tests/functional/tenants test_tenants.py.*
ydb/tests/functional/ydb_cli test_ydb_impex.py.TestImpex.test_big_dataset*
ydb/tests/tools/pq_read/test test_timeout.py.TestTimeout.test_timeout
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
ydb/tests/functional/rename [test_rename.py */10] chunk chunk
54 changes: 54 additions & 0 deletions .github/workflows/nightly_build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: Nightly-Build # workflow used to upload built binaries to s3
on:
workflow_dispatch:
inputs:
runner_label:
type: string
default: "auto-provisioned"
description: "runner label"
commit_sha:
type: string
default: ""
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
build_preset: ["relwithdebinfo", "release-asan"]
runs-on: [ self-hosted, auto-provisioned, "${{ format('build-preset-{0}', matrix.build_preset) }}" ]
name: Build and test ${{ matrix.build_preset }}
steps:
- name: Checkout
uses: actions/checkout@v4
with:
ref: ${{ inputs.commit_sha }}
fetch-depth: 2
- name: Setup ydb access
uses: ./.github/actions/setup_ci_ydb_service_account_key_file_credentials
with:
ci_ydb_service_account_key_file_credentials: ${{ secrets.CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS }}
- name: Build and test
uses: ./.github/actions/build_and_test_ya
with:
build_preset: ${{ matrix.build_preset }}
build_target: "ydb/apps/ydbd"
increment: false
run_tests: false
put_build_results_to_cache: false
secs: ${{ format('{{"TESTMO_TOKEN2":"{0}","AWS_KEY_ID":"{1}","AWS_KEY_VALUE":"{2}","REMOTE_CACHE_USERNAME":"{3}","REMOTE_CACHE_PASSWORD":"{4}"}}',
secrets.TESTMO_TOKEN2, secrets.AWS_KEY_ID, secrets.AWS_KEY_VALUE, secrets.REMOTE_CACHE_USERNAME, secrets.REMOTE_CACHE_PASSWORD ) }}
vars: ${{ format('{{"AWS_BUCKET":"{0}","AWS_ENDPOINT":"{1}","REMOTE_CACHE_URL":"{2}","TESTMO_URL":"{3}","TESTMO_PROJECT_ID":"{4}"}}',
vars.AWS_BUCKET, vars.AWS_ENDPOINT, vars.REMOTE_CACHE_URL_YA, vars.TESTMO_URL, vars.TESTMO_PROJECT_ID ) }}
- name: Setup s3cmd
uses: ./.github/actions/s3cmd
with:
s3_bucket: "ydb-builds"
s3_endpoint: ${{ vars.AWS_ENDPOINT }}
s3_key_id: ${{ secrets.AWS_KEY_ID }}
s3_key_secret: ${{ secrets.AWS_KEY_VALUE }}

- name: sync results to s3 and publish links
shell: bash
run: |
set -x
s3cmd sync --follow-symlinks --acl-public --no-progress --stats --no-check-md5 "ydb/apps/ydbd/ydbd" "s3://ydb-builds/${{ github.ref_name }}/${{ matrix.build_preset }}/ydbd" -d
20 changes: 11 additions & 9 deletions ydb/core/base/board_lookup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace NKikimr {
class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
const TString Path;
const TActorId Owner;
const ui64 Cookie;
const EBoardLookupMode Mode;
const bool Subscriber;
TBoardRetrySettings BoardRetrySettings;
Expand Down Expand Up @@ -111,12 +112,12 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
void NotAvailable() {
if (CurrentStateFunc() != &TThis::StateSubscribe) {
Send(Owner, new TEvStateStorage::TEvBoardInfo(
TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path));
TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path), 0, Cookie);
} else {
Send(Owner,
new TEvStateStorage::TEvBoardInfoUpdate(
TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path
)
), 0, Cookie
);
}
return PassAway();
Expand All @@ -129,7 +130,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfo>(
TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
reply->InfoEntries = std::move(Info);
Send(Owner, std::move(reply));
Send(Owner, std::move(reply), 0, Cookie);
if (Subscriber) {
Become(&TThis::StateSubscribe);
return;
Expand Down Expand Up @@ -240,7 +241,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
reply->Updates = { { oid, std::move(update.value()) } };
Send(Owner, std::move(reply));
Send(Owner, std::move(reply), 0, Cookie);
}
} else {
if (info.GetDropped()) {
Expand Down Expand Up @@ -308,7 +309,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
reply->Updates = std::move(updates);
Send(Owner, std::move(reply));
Send(Owner, std::move(reply), 0, Cookie);
}
}

Expand Down Expand Up @@ -484,7 +485,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
reply->Updates = std::move(updates);
Send(Owner, std::move(reply));
Send(Owner, std::move(reply), 0, Cookie);
}
}

Expand All @@ -495,9 +496,10 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {

TBoardLookupActor(
const TString &path, TActorId owner, EBoardLookupMode mode,
TBoardRetrySettings boardRetrySettings)
TBoardRetrySettings boardRetrySettings, ui64 cookie = 0)
: Path(path)
, Owner(owner)
, Cookie(cookie)
, Mode(mode)
, Subscriber(Mode == EBoardLookupMode::Subscription)
, BoardRetrySettings(std::move(boardRetrySettings))
Expand Down Expand Up @@ -545,8 +547,8 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {

IActor* CreateBoardLookupActor(
const TString &path, const TActorId &owner, EBoardLookupMode mode,
TBoardRetrySettings boardRetrySettings) {
return new TBoardLookupActor(path, owner, mode, std::move(boardRetrySettings));
TBoardRetrySettings boardRetrySettings, ui64 cookie) {
return new TBoardLookupActor(path, owner, mode, std::move(boardRetrySettings), cookie);
}

}
2 changes: 2 additions & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ struct TKikimrEvents : TEvents {
ES_TX_BACKGROUND = 4256,
ES_SS_BG_TASKS = 4257,
ES_LIMITER = 4258,
//ES_MEMORY = 4259, NB. exists in main
ES_GROUPED_ALLOCATIONS_MANAGER = 4260,
};
};

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/base/statestorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ IActor* CreateStateStorageBoardReplica(const TIntrusivePtr<TStateStorageInfo> &,
IActor* CreateSchemeBoardReplica(const TIntrusivePtr<TStateStorageInfo>&, ui32);
IActor* CreateBoardLookupActor(
const TString &path, const TActorId &owner, EBoardLookupMode mode,
TBoardRetrySettings boardRetrySettings = {});
TBoardRetrySettings boardRetrySettings = {}, ui64 cookie = 0);
IActor* CreateBoardPublishActor(
const TString &path, const TString &payload, const TActorId &owner, ui32 ttlMs, bool reg,
TBoardRetrySettings boardRetrySettings = {});
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ void TPDisk::WhiteboardReport(TWhiteboardReport &whiteboardReport) {
TGuard<TMutex> guard(StateMutex);
const ui64 totalSize = Format.DiskSize;
const ui64 availableSize = (ui64)Format.ChunkSize * Keeper.GetFreeChunkCount();

if (*Mon.PDiskBriefState != TPDiskMon::TPDisk::Error) {
*Mon.FreeSpaceBytes = availableSize;
*Mon.UsedSpaceBytes = totalSize - availableSize;
Expand All @@ -1491,7 +1491,7 @@ void TPDisk::WhiteboardReport(TWhiteboardReport &whiteboardReport) {
*Mon.UsedSpaceBytes = 32_KB;
*Mon.TotalSpaceBytes = 32_KB;
}

NKikimrWhiteboard::TPDiskStateInfo& pdiskState = reportResult->PDiskState->Record;
pdiskState.SetPDiskId(PDiskId);
pdiskState.SetPath(Cfg->GetDevicePath());
Expand All @@ -1503,6 +1503,7 @@ void TPDisk::WhiteboardReport(TWhiteboardReport &whiteboardReport) {
pdiskState.SetSystemSize(Format.ChunkSize * (Keeper.GetOwnerHardLimit(OwnerSystemLog) + Keeper.GetOwnerHardLimit(OwnerSystemReserve)));
pdiskState.SetLogUsedSize(Format.ChunkSize * (Keeper.GetOwnerHardLimit(OwnerCommonStaticLog) - Keeper.GetOwnerFree(OwnerCommonStaticLog)));
pdiskState.SetLogTotalSize(Format.ChunkSize * Keeper.GetOwnerHardLimit(OwnerCommonStaticLog));
pdiskState.SetNumActiveSlots(TotalOwners);
if (ExpectedSlotCount) {
pdiskState.SetExpectedSlotCount(ExpectedSlotCount);
}
Expand Down
16 changes: 14 additions & 2 deletions ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,10 @@ class TBaseChangeSender {
}

TActorId GetChangeServer() const { return ChangeServer; }
void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) {
if (partitioningChanged) {

private:
void CreateSendersImpl(const TVector<ui64>& partitionIds) {
if (partitionIds) {
CreateMissingSenders(partitionIds);
} else {
RecreateSenders(GonePartitions);
Expand All @@ -427,6 +429,16 @@ class TBaseChangeSender {
}
}

protected:
void CreateSenders(const TVector<ui64>& partitionIds) {
Y_ABORT_UNLESS(partitionIds);
CreateSendersImpl(partitionIds);
}

void CreateSenders() {
CreateSendersImpl({});
}

void KillSenders() {
for (const auto& [_, sender] : std::exchange(Senders, {})) {
if (sender.ActorId) {
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/change_exchange/util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include "util.h"

namespace NKikimr::NChangeExchange {

TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(::Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId);
}

return result;
}

}
9 changes: 9 additions & 0 deletions ydb/core/change_exchange/util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include <ydb/core/scheme/scheme_tabledefs.h>

namespace NKikimr::NChangeExchange {

TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions);

}
1 change: 1 addition & 0 deletions ydb/core/change_exchange/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ SRCS(
change_exchange.cpp
change_record.cpp
change_sender_monitoring.cpp
util.cpp
)

GENERATE_ENUM_SERIALIZATION(change_record.h)
Expand Down
29 changes: 18 additions & 11 deletions ydb/core/client/server/msgbus_server_pq_metacache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
req->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
req->Record.MutableRequest()->SetKeepSession(false);
req->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
req->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);

req->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
req->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
Expand Down Expand Up @@ -274,9 +275,14 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc

const auto& record = ev->Get()->Record.GetRef();

Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
ui64 newVersion = rr.ListSize() == 0 ? 0 : rr.GetList(0).GetStruct(0).GetOptional().GetInt64();
Y_VERIFY(record.GetResponse().YdbResultsSize() == 1);
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));

ui64 newVersion = 0;
if (parser.RowsCount() != 0) {
parser.TryNextRow();
newVersion = *parser.ColumnParser(0).GetOptionalInt64();
}

LastVersionUpdate = ctx.Now();
if (newVersion > CurrentTopicsVersion || CurrentTopicsVersion == 0 || SkipVersionCheck) {
Expand All @@ -293,17 +299,18 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc

const auto& record = ev->Get()->Record.GetRef();

Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
Y_VERIFY(record.GetResponse().YdbResultsSize() == 1);
TString path, dc;
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
for (const auto& row : rr.GetList()) {

path = row.GetStruct(0).GetOptional().GetText();
dc = row.GetStruct(1).GetOptional().GetText();
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));
const ui32 rowCount = parser.RowsCount();
while (parser.TryNextRow()) {
path = *parser.ColumnParser(0).GetOptionalUtf8();
dc = *parser.ColumnParser(1).GetOptionalUtf8();

NewTopics.emplace_back(decltype(NewTopics)::value_type{path, dc});
}
if (rr.ListSize() > 0) {

if (rowCount > 0) {
LastTopicKey = {path, dc};
return RunQuery(EQueryType::EGetTopics, ctx);
} else {
Expand Down Expand Up @@ -710,7 +717,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
void ProcessNodesInfoWaitersQueue(bool status, const TActorContext& ctx) {
if (DynamicNodesMapping == nullptr) {
Y_ABORT_UNLESS(!status);
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
}
while(!NodesMappingWaiters.empty()) {
ctx.Send(NodesMappingWaiters.front(),
Expand Down
Loading

0 comments on commit 645139e

Please sign in to comment.