Skip to content

Commit

Permalink
Fix 2110 (#2111)
Browse files Browse the repository at this point in the history
  • Loading branch information
owent authored Apr 27, 2023
1 parent 6c7f7c8 commit 6ca0d68
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient
std::recursive_mutex session_ids_m_;
std::unordered_map<uint64_t, std::shared_ptr<Session>> sessions_;
std::unordered_set<uint64_t> pending_to_add_session_ids_;
std::unordered_set<uint64_t> pending_to_abort_session_ids_;
std::unordered_map<uint64_t, std::shared_ptr<Session>> pending_to_abort_sessions_;
std::unordered_map<uint64_t, HttpCurlEasyResource> pending_to_remove_session_handles_;
std::list<std::shared_ptr<Session>> pending_to_remove_sessions_;

Expand Down
50 changes: 26 additions & 24 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ void HttpClient::CleanupSession(uint64_t session_id)
}
else if (session->IsSessionActive() && session->GetOperation())
{
// If this session is alread waiting to be removed, just wakeup background thread to call
// doRemoveSessions()
// If this session is already running, give it to the background thread for cleanup.
pending_to_abort_sessions_[session_id] = std::move(session);
wakeupBackgroundThread();
}
}
Expand Down Expand Up @@ -392,7 +392,7 @@ void HttpClient::ScheduleAddSession(uint64_t session_id)
std::lock_guard<std::recursive_mutex> lock_guard{session_ids_m_};
pending_to_add_session_ids_.insert(session_id);
pending_to_remove_session_handles_.erase(session_id);
pending_to_abort_session_ids_.erase(session_id);
pending_to_abort_sessions_.erase(session_id);
}

wakeupBackgroundThread();
Expand All @@ -401,9 +401,21 @@ void HttpClient::ScheduleAddSession(uint64_t session_id)
void HttpClient::ScheduleAbortSession(uint64_t session_id)
{
{
std::lock_guard<std::recursive_mutex> lock_guard{session_ids_m_};
pending_to_abort_session_ids_.insert(session_id);
pending_to_add_session_ids_.erase(session_id);
std::lock_guard<std::mutex> lock_guard{sessions_m_};
auto session = sessions_.find(session_id);
if (session == sessions_.end())
{
std::lock_guard<std::recursive_mutex> lock_guard{session_ids_m_};
pending_to_add_session_ids_.erase(session_id);
}
else
{
std::lock_guard<std::recursive_mutex> lock_guard{session_ids_m_};
pending_to_abort_sessions_[session_id] = std::move(session->second);
pending_to_add_session_ids_.erase(session_id);

sessions_.erase(session);
}
}

wakeupBackgroundThread();
Expand Down Expand Up @@ -472,33 +484,23 @@ bool HttpClient::doAddSessions()

bool HttpClient::doAbortSessions()
{
std::list<std::shared_ptr<Session>> abort_sessions;
std::unordered_set<uint64_t> pending_to_abort_session_ids;
std::unordered_map<uint64_t, std::shared_ptr<Session>> pending_to_abort_sessions;
{
std::lock_guard<std::recursive_mutex> session_id_lock_guard{session_ids_m_};
pending_to_abort_session_ids_.swap(pending_to_abort_session_ids);
pending_to_abort_sessions_.swap(pending_to_abort_sessions);
}

bool has_data = false;
for (auto session : pending_to_abort_sessions)
{
std::lock_guard<std::mutex> lock_guard{sessions_m_};
for (auto &session_id : pending_to_abort_session_ids)
if (!session.second)
{
auto session = sessions_.find(session_id);
if (session == sessions_.end())
{
continue;
}

abort_sessions.push_back(session->second);
continue;
}
}

bool has_data = false;
for (auto session : abort_sessions)
{
if (session->GetOperation())
if (session.second->GetOperation())
{
session->FinishOperation();
session.second->FinishOperation();
has_data = true;
}
}
Expand Down

0 comments on commit 6ca0d68

Please sign in to comment.