diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index ef5c2ce231e1..8740017555eb 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -714,7 +714,7 @@ friend class IHTTPGateway; } size_t FillHandlers() { - const std::unique_lock lock(Sync); + const std::unique_lock lock(SyncRef()); for (auto it = Streams.cbegin(); Streams.cend() != it;) { if (const auto& stream = it->lock()) { const auto streamHandle = stream->GetHandle(); @@ -763,7 +763,7 @@ friend class IHTTPGateway; TEasyCurl::TPtr easy; long httpResponseCode = 0L; { - const std::unique_lock lock(Sync); + const std::unique_lock lock(SyncRef()); if (const auto it = Allocated.find(handle); Allocated.cend() != it) { easy = std::move(it->second); TString codeLabel; @@ -815,7 +815,7 @@ friend class IHTTPGateway; void Fail(CURLMcode result) { std::stack works; { - const std::unique_lock lock(Sync); + const std::unique_lock lock(SyncRef()); for (auto& item : Allocated) { works.emplace(std::move(item.second)); @@ -836,7 +836,7 @@ friend class IHTTPGateway; void Upload(TString url, THeaders headers, TString body, TOnResult callback, bool put, TRetryPolicy::TPtr retryPolicy) final { Rps->Inc(); - const std::unique_lock lock(Sync); + const std::unique_lock lock(SyncRef()); auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList()); Await.emplace(std::move(easy)); Wakeup(0U); @@ -845,7 +845,7 @@ friend class IHTTPGateway; void Delete(TString url, THeaders headers, TOnResult callback, TRetryPolicy::TPtr retryPolicy) final { Rps->Inc(); - const std::unique_lock lock(Sync); + const std::unique_lock lock(SyncRef()); auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::DELETE, 0, std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList()); Await.emplace(std::move(easy)); Wakeup(0U); @@ -866,7 +866,7 @@ friend class IHTTPGateway; callback(TResult(CURLE_OK, TIssues{error})); return; } - const std::unique_lock lock(Sync); + const std::unique_lock lock(SyncRef()); auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList()); Await.emplace(std::move(easy)); Wakeup(sizeLimit); @@ -883,13 +883,14 @@ friend class IHTTPGateway; const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final { auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList()); - const std::unique_lock lock(Sync); + const std::unique_lock lock(SyncRef()); const auto handle = stream->GetHandle(); TEasyCurlStream::TWeakPtr weak = stream; Streams.emplace_back(stream); Allocated.emplace(handle, std::move(stream)); Wakeup(0ULL); - return [weak](TIssue issue) { + return [weak, sync=Sync](TIssue issue) { + const std::unique_lock lock(*sync); if (const auto& stream = weak.lock()) stream->Cancel(issue); }; @@ -900,7 +901,7 @@ friend class IHTTPGateway; } void OnRetry(TEasyCurlBuffer::TPtr easy) { - const std::unique_lock lock(Sync); + const std::unique_lock lock(SyncRef()); const size_t sizeLimit = easy->GetSizeLimit(); Await.emplace(std::move(easy)); Wakeup(sizeLimit); @@ -918,6 +919,10 @@ friend class IHTTPGateway; } private: + std::mutex& SyncRef() { + return *Sync; + } + CURLM* Handle = nullptr; std::queue Await; @@ -927,7 +932,7 @@ friend class IHTTPGateway; std::unordered_map Allocated; std::priority_queue> Delayed; - std::mutex Sync; + std::shared_ptr Sync = std::make_shared(); std::thread Thread; std::atomic IsStopped = false; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index b1ed36476e8f..85733ae37295 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -166,7 +166,11 @@ struct TReadSpec { NDB::ColumnsWithTypeAndName CHColumns; std::shared_ptr ArrowSchema; NDB::FormatSettings Settings; - TString Format, Compression; + // It's very important to keep here std::string instead of TString + // because of the cast from TString to std::string is using the MutRef (it isn't thread-safe). + // This behaviour can be found in the getInputFormat call + std::string Format; + TString Compression; ui64 SizeLimit = 0; ui32 BlockLengthPosition = 0; std::vector ColumnReorder; @@ -1367,12 +1371,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public DecodedChunkSizeHist, HttpInflightSize, HttpDataRps, - DeferredQueueSize, - ReadSpec->Format, - ReadSpec->Compression, - ReadSpec->ArrowSchema, - ReadSpec->RowSpec, - ReadSpec->Settings + DeferredQueueSize ); if (!UseRuntimeListing) { diff --git a/ydb/library/yql/providers/s3/common/source_context.h b/ydb/library/yql/providers/s3/common/source_context.h index 3cfc5aa7dc73..7b39d88ce970 100644 --- a/ydb/library/yql/providers/s3/common/source_context.h +++ b/ydb/library/yql/providers/s3/common/source_context.h @@ -34,12 +34,7 @@ struct TSourceContext { , NMonitoring::THistogramPtr decodedChunkSizeHist , NMonitoring::TDynamicCounters::TCounterPtr httpInflightSize , NMonitoring::TDynamicCounters::TCounterPtr httpDataRps - , NMonitoring::TDynamicCounters::TCounterPtr deferredQueueSize - , const TString format - , const TString compression - , std::shared_ptr schema - , std::unordered_map> rowTypes - , NDB::FormatSettings settings) + , NMonitoring::TDynamicCounters::TCounterPtr deferredQueueSize) : SourceId(sourceId) , Limit(limit) , ActorSystem(actorSystem) @@ -54,11 +49,6 @@ struct TSourceContext { , HttpInflightSize(httpInflightSize) , HttpDataRps(httpDataRps) , DeferredQueueSize(deferredQueueSize) - , Format(format) - , Compression(compression) - , Schema(schema) - , RowTypes(rowTypes) - , Settings(settings) { } @@ -105,11 +95,6 @@ struct TSourceContext { NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps; NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize; - const TString Format; - const TString Compression; - std::shared_ptr Schema; - std::unordered_map> RowTypes; - NDB::FormatSettings Settings; private: std::atomic_uint64_t Value; std::mutex Mutex;