Skip to content

Commit

Permalink
AsyncDecoding and HttpGateway have been fixed (ydb-platform#9118)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored and Oleg Doronin committed Sep 12, 2024
1 parent 192f29c commit 98d91a2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 33 deletions.
25 changes: 15 additions & 10 deletions ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ friend class IHTTPGateway;
}

size_t FillHandlers() {
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
for (auto it = Streams.cbegin(); Streams.cend() != it;) {
if (const auto& stream = it->lock()) {
const auto streamHandle = stream->GetHandle();
Expand Down Expand Up @@ -763,7 +763,7 @@ friend class IHTTPGateway;
TEasyCurl::TPtr easy;
long httpResponseCode = 0L;
{
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
if (const auto it = Allocated.find(handle); Allocated.cend() != it) {
easy = std::move(it->second);
TString codeLabel;
Expand Down Expand Up @@ -815,7 +815,7 @@ friend class IHTTPGateway;
void Fail(CURLMcode result) {
std::stack<TEasyCurl::TPtr> works;
{
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());

for (auto& item : Allocated) {
works.emplace(std::move(item.second));
Expand All @@ -836,7 +836,7 @@ friend class IHTTPGateway;
void Upload(TString url, THeaders headers, TString body, TOnResult callback, bool put, TRetryPolicy::TPtr retryPolicy) final {
Rps->Inc();

const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList());
Await.emplace(std::move(easy));
Wakeup(0U);
Expand All @@ -845,7 +845,7 @@ friend class IHTTPGateway;
void Delete(TString url, THeaders headers, TOnResult callback, TRetryPolicy::TPtr retryPolicy) final {
Rps->Inc();

const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::DELETE, 0, std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList());
Await.emplace(std::move(easy));
Wakeup(0U);
Expand All @@ -866,7 +866,7 @@ friend class IHTTPGateway;
callback(TResult(CURLE_OK, TIssues{error}));
return;
}
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig, DnsGateway.GetDNSCurlList());
Await.emplace(std::move(easy));
Wakeup(sizeLimit);
Expand All @@ -883,13 +883,14 @@ friend class IHTTPGateway;
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final
{
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList());
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
const auto handle = stream->GetHandle();
TEasyCurlStream::TWeakPtr weak = stream;
Streams.emplace_back(stream);
Allocated.emplace(handle, std::move(stream));
Wakeup(0ULL);
return [weak](TIssue issue) {
return [weak, sync=Sync](TIssue issue) {
const std::unique_lock lock(*sync);
if (const auto& stream = weak.lock())
stream->Cancel(issue);
};
Expand All @@ -900,7 +901,7 @@ friend class IHTTPGateway;
}

void OnRetry(TEasyCurlBuffer::TPtr easy) {
const std::unique_lock lock(Sync);
const std::unique_lock lock(SyncRef());
const size_t sizeLimit = easy->GetSizeLimit();
Await.emplace(std::move(easy));
Wakeup(sizeLimit);
Expand All @@ -918,6 +919,10 @@ friend class IHTTPGateway;
}

private:
std::mutex& SyncRef() {
return *Sync;
}

CURLM* Handle = nullptr;

std::queue<TEasyCurlBuffer::TPtr> Await;
Expand All @@ -927,7 +932,7 @@ friend class IHTTPGateway;
std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated;
std::priority_queue<std::pair<TInstant, TEasyCurlBuffer::TPtr>> Delayed;

std::mutex Sync;
std::shared_ptr<std::mutex> Sync = std::make_shared<std::mutex>();
std::thread Thread;
std::atomic<bool> IsStopped = false;

Expand Down
13 changes: 6 additions & 7 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ struct TReadSpec {
NDB::ColumnsWithTypeAndName CHColumns;
std::shared_ptr<arrow::Schema> ArrowSchema;
NDB::FormatSettings Settings;
TString Format, Compression;
// It's very important to keep here std::string instead of TString
// because of the cast from TString to std::string is using the MutRef (it isn't thread-safe).
// This behaviour can be found in the getInputFormat call
std::string Format;
TString Compression;
ui64 SizeLimit = 0;
ui32 BlockLengthPosition = 0;
std::vector<ui32> ColumnReorder;
Expand Down Expand Up @@ -1367,12 +1371,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
DecodedChunkSizeHist,
HttpInflightSize,
HttpDataRps,
DeferredQueueSize,
ReadSpec->Format,
ReadSpec->Compression,
ReadSpec->ArrowSchema,
ReadSpec->RowSpec,
ReadSpec->Settings
DeferredQueueSize
);

if (!UseRuntimeListing) {
Expand Down
17 changes: 1 addition & 16 deletions ydb/library/yql/providers/s3/common/source_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ struct TSourceContext {
, NMonitoring::THistogramPtr decodedChunkSizeHist
, NMonitoring::TDynamicCounters::TCounterPtr httpInflightSize
, NMonitoring::TDynamicCounters::TCounterPtr httpDataRps
, NMonitoring::TDynamicCounters::TCounterPtr deferredQueueSize
, const TString format
, const TString compression
, std::shared_ptr<arrow::Schema> schema
, std::unordered_map<TStringBuf, NKikimr::NMiniKQL::TType*, THash<TStringBuf>> rowTypes
, NDB::FormatSettings settings)
, NMonitoring::TDynamicCounters::TCounterPtr deferredQueueSize)
: SourceId(sourceId)
, Limit(limit)
, ActorSystem(actorSystem)
Expand All @@ -54,11 +49,6 @@ struct TSourceContext {
, HttpInflightSize(httpInflightSize)
, HttpDataRps(httpDataRps)
, DeferredQueueSize(deferredQueueSize)
, Format(format)
, Compression(compression)
, Schema(schema)
, RowTypes(rowTypes)
, Settings(settings)
{
}

Expand Down Expand Up @@ -105,11 +95,6 @@ struct TSourceContext {
NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
const TString Format;
const TString Compression;
std::shared_ptr<arrow::Schema> Schema;
std::unordered_map<TStringBuf, NKikimr::NMiniKQL::TType*, THash<TStringBuf>> RowTypes;
NDB::FormatSettings Settings;
private:
std::atomic_uint64_t Value;
std::mutex Mutex;
Expand Down

0 comments on commit 98d91a2

Please sign in to comment.