Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix not found request for forward response #370

Merged
merged 3 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/Service/ConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,9 @@ void ConnectionHandler::sendSessionResponseToClient(const Coordination::ZooKeepe

void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKeeperResponsePtr & response)
{
LOG_DEBUG(log, "Push a response of session {} to IO sending queue. {}", toHexString(session_id.load()), response->toString());
LOG_DEBUG(log, "Push response #{}#{}#{} with error '{}' to IO sending queue.", toHexString(session_id.load()),
response->xid, Coordination::toString(response->getOpNum()), errorMessage(response->error));

updateStats(response);

/// Lock to avoid data condition which will lead response leak
Expand Down Expand Up @@ -697,10 +699,12 @@ void ConnectionHandler::updateStats(const Coordination::ZooKeeperResponsePtr & r
if (unlikely(elapsed > 10000))
LOG_WARNING(
log,
"Slow request detected #{}#{}#{}, time costs {}ms, please take care.",
"Slow request detected #{}#{}#{}, current_time {}, create_time {}, time costs {}ms, please take care.",
toHexString(session_id.load()),
response->xid,
Coordination::toString(response->getOpNum()),
current_time,
response->request_created_time_ms,
elapsed);

keeper_dispatcher->updateKeeperStatLatency(elapsed);
Expand Down
7 changes: 4 additions & 3 deletions src/Service/ForwardConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void ForwardConnection::connect()
{
try
{
LOG_TRACE(log, "Try connect forward server {}", endpoint);
LOG_INFO(log, "Try connecting forward server {}", endpoint);

/// Reset the state of previous attempt.
socket = Poco::Net::StreamSocket();
Expand All @@ -45,7 +45,7 @@ void ForwardConnection::connect()
throw Exception(ErrorCodes::RAFT_FORWARD_ERROR, "Handshake with {} failed", endpoint);

connected = true;
LOG_TRACE(log, "Connect to {} success", endpoint);
LOG_INFO(log, "Connected to forward server success, peer address {}, local address {}", socket.peerAddress().toString(), socket.address().toString());
break;
}
catch (...)
Expand All @@ -71,7 +71,7 @@ void ForwardConnection::disconnect()

void ForwardConnection::send(ForwardRequestPtr request)
{
LOG_TRACE(log, "Forward request {} to endpoint {}", request->toString(), endpoint);
LOG_DEBUG(log, "Forwarding request {} to leader {}", request->toString(), endpoint);

if (unlikely(!connected))
connect();
Expand Down Expand Up @@ -127,6 +127,7 @@ void ForwardConnection::receive(ForwardResponsePtr & response)
}

response->readImpl(*in);
LOG_DEBUG(log, "Received forward response {} from {} done", response->toString(), endpoint);
}
catch (Exception & e)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Service/ForwardConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class ForwardConnection
, client_id(client_id_)
, endpoint(endpoint_)
, socket_timeout(socket_timeout_)
, log(&Poco::Logger::get("ForwardConnection"))
, log(&Poco::Logger::get(fmt::format("ForwardConnection#{}#{}#{}",
my_server_id, client_id, endpoint)))
{
}

Expand Down
3 changes: 2 additions & 1 deletion src/Service/ForwardConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ void ForwardConnectionHandler::processUserOrSessionRequest(ForwardRequestPtr req
{
ReadBufferFromMemory body(req_body_buf->begin(), req_body_buf->used());
request->readImpl(body);
LOG_DEBUG(log, "Received forward request {} from server {} client {}", request->toString(), server_id, client_id);
keeper_dispatcher->pushForwardRequest(server_id, client_id, request);
}

Expand Down Expand Up @@ -351,7 +352,7 @@ void ForwardConnectionHandler::onSocketError(const Notification &)

void ForwardConnectionHandler::sendResponse(ForwardResponsePtr response)
{
LOG_TRACE(log, "Send response {}", response->toString());
LOG_DEBUG(log, "Send forward response {} to server {} client {}.", response->toString(), server_id, client_id);

{
/// Lock to avoid data condition which will lead response leak
Expand Down
7 changes: 4 additions & 3 deletions src/Service/ForwardRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ struct ForwardNewSessionRequest : public ForwardRequest
String toString() const override
{
auto * request_ptr = dynamic_cast<ZooKeeperNewSessionRequest *>(request.get());
return fmt::format("#{}#{}#{}", RK::toString(forwardType()), request_ptr->internal_id, request_ptr->session_timeout_ms);
return fmt::format("#{}#{}#{}", RK::toString(forwardType()), toHexString(request_ptr->internal_id), request_ptr->session_timeout_ms);
}
};

Expand All @@ -117,7 +117,7 @@ struct ForwardUpdateSessionRequest : public ForwardRequest
String toString() const override
{
auto * request_ptr = dynamic_cast<ZooKeeperUpdateSessionRequest *>(request.get());
return fmt::format("#{}#{}#{}", RK::toString(forwardType()), request_ptr->session_id, request_ptr->session_timeout_ms);
return fmt::format("#{}#{}#{}", RK::toString(forwardType()), toHexString(request_ptr->session_id), request_ptr->session_timeout_ms);
}
};

Expand All @@ -136,7 +136,8 @@ struct ForwardUserRequest : public ForwardRequest

String toString() const override
{
return fmt::format("#{}#{}#{}", RK::toString(forwardType()), request.session_id, request.request->xid);
return fmt::format("#{}#{}#{}#{}", RK::toString(forwardType()), toHexString(request.session_id)
, request.request->xid, Coordination::toString(request.request->getOpNum()));
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/Service/ForwardResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ZooKeeper/ZooKeeperIO.h>
#include <ZooKeeper/ZooKeeperCommon.h>
#include <libnuraft/async.hxx>
#include <Service/formatHex.h>

namespace RK
{
Expand Down Expand Up @@ -152,9 +153,8 @@ struct ForwardUserRequestResponse : public ForwardResponse

String toString() const override
{
return "ForwardType: " + RK::toString(forwardType()) + ", accepted " + std::to_string(accepted) + " error_code "
+ std::to_string(error_code) + " session " + std::to_string(session_id) + " xid " + std::to_string(xid) + " opnum "
+ Coordination::toString(opnum);
return fmt::format("#{}#{}#{}#{}, accepted {}, error_code {}", RK::toString(forwardType()),
toHexString(session_id), xid, Coordination::toString(opnum), accepted, error_code);
}
};

Expand Down
5 changes: 4 additions & 1 deletion src/Service/KeeperCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ struct RequestForSession
int64_t session_id;
Coordination::ZooKeeperRequestPtr request;

/// measured in millisecond
/// When request create, millisecond count since system start up, used for calculate request duration
int64_t create_time{};
JackyWoo marked this conversation as resolved.
Show resolved Hide resolved

/// When leader append write request, millisecond count since 1970-1-1, used for node stat ctime and mtime.
int64_t process_time{};
JackyWoo marked this conversation as resolved.
Show resolved Hide resolved

/// for forward request
int32_t server_id{-1};
int32_t client_id{-1};
Expand Down
6 changes: 3 additions & 3 deletions src/Service/KeeperStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ void KeeperStore::processRequest(
if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat)
{
StoreRequestPtr store_request = StoreRequestFactory::instance().get(zk_request);
auto [response, _] = store_request->process(*this, zxid, session_id, request_for_session.create_time);
auto [response, _] = store_request->process(*this, zxid, session_id, request_for_session.process_time);
response->xid = zk_request->xid;
response->zxid = zxid;
LOG_TRACE(log, "heart beat for session {}", toHexString(session_id));
Expand All @@ -1300,7 +1300,7 @@ void KeeperStore::processRequest(
else if (zk_request->getOpNum() == Coordination::OpNum::SetWatches)
{
StoreRequestPtr store_request = StoreRequestFactory::instance().get(zk_request);
auto [response, _] = store_request->process(*this, zxid, session_id, request_for_session.create_time);
auto [response, _] = store_request->process(*this, zxid, session_id, request_for_session.process_time);
response->xid = zk_request->xid;
response->zxid = zxid;

Expand Down Expand Up @@ -1333,7 +1333,7 @@ void KeeperStore::processRequest(
}
else
{
response = store_request->process(*this, zxid, session_id, request_for_session.create_time).first;
response = store_request->process(*this, zxid, session_id, request_for_session.process_time).first;
}

response->request_created_time_ms = request_for_session.create_time;
Expand Down
4 changes: 2 additions & 2 deletions src/Service/KeeperUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ptr<buffer> serializeKeeperRequest(const RequestForSession & request)
WriteBufferFromNuraftBuffer out;
writeIntBinary(request.session_id, out);
request.request->write(out);
Coordination::write(request.create_time, out);
Coordination::write(request.process_time, out);
return out.getBuffer();
}

Expand All @@ -68,7 +68,7 @@ ptr<RequestForSession> deserializeKeeperRequest(nuraft::buffer & data)
request->request->xid = xid;
request->request->readImpl(buffer);

Coordination::read(request->create_time, buffer);
Coordination::read(request->process_time, buffer);

return request;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Service/KeeperUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ inline UInt64 getCurrentTimeMicroseconds()
return duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
}

inline UInt64 getCurrentWallTimeMilliseconds()
{
return duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}

/// Serialize and deserialize ZooKeeper request to log
nuraft::ptr<nuraft::buffer> serializeKeeperRequest(const RequestForSession & request);
nuraft::ptr<RequestForSession> deserializeKeeperRequest(nuraft::buffer & data);
Expand Down
1 change: 1 addition & 0 deletions src/Service/RequestAccumulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ void RequestAccumulator::run()

if (pop_success)
{
request_for_session.process_time = getCurrentWallTimeMilliseconds();
to_append_batch.emplace_back(request_for_session);

if (to_append_batch.size() >= max_batch_size)
Expand Down
20 changes: 14 additions & 6 deletions src/Service/RequestForwarder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <Service/RequestForwarder.h>
#include <Service/Context.h>
#include <Common/setThreadName.h>
#include <fmt/ranges.h>

namespace RK
{
Expand Down Expand Up @@ -60,9 +61,8 @@ void RequestForwarder::runSend(RunnerId runner_id)

ForwardRequestPtr forward_request = ForwardRequestFactory::instance().convertFromRequest(request_for_session);
forward_request->send_time = clock::now();
forward_request_queue[runner_id]->push(forward_request);
connection->send(forward_request);

forward_request_queue[runner_id]->push(std::move(forward_request));
}
catch (...)
{
Expand Down Expand Up @@ -103,8 +103,8 @@ void RequestForwarder::runSend(RunnerId runner_id)
{
ForwardRequestPtr forward_request = std::make_shared<ForwardSyncSessionsRequest>(std::move(session_to_expiration_time));
forward_request->send_time = clock::now();
forward_request_queue[runner_id]->push(forward_request);
connection->send(forward_request);
forward_request_queue[runner_id]->push(std::move(forward_request));
}
}
else
Expand Down Expand Up @@ -156,8 +156,9 @@ void RequestForwarder::runReceive(RunnerId runner_id)
{
LOG_DEBUG(
log,
"Earliest request {} deadline {}, now {}",
"Earliest request {} in runner {} deadline {}, now {}",
earliest_request->toString(),
runner_id,
to_microseconds(earliest_request_deadline.time_since_epoch()),
to_microseconds(now.time_since_epoch()));

Expand Down Expand Up @@ -260,10 +261,17 @@ bool RequestForwarder::removeFromQueue(RunnerId runner_id, ForwardResponsePtr fo

void RequestForwarder::processResponse(RunnerId runner_id, ForwardResponsePtr forward_response_ptr)
{
bool found = removeFromQueue(runner_id, forward_response_ptr);
if (!removeFromQueue(runner_id, forward_response_ptr))
{
LOG_WARNING(log, "Not found request in runner {} for forward response {}", runner_id, forward_response_ptr->toString());
return;
}

if (!found || forward_response_ptr->accepted)
if (forward_response_ptr->accepted)
{
LOG_DEBUG(log, "Receive a forward response {} for runner {}", forward_response_ptr->toString(), runner_id);
return;
}

/// common request
LOG_ERROR(log, "Receive failed forward response {}", forward_response_ptr->toString());
Expand Down
38 changes: 18 additions & 20 deletions src/Service/RequestProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void RequestProcessor::moveRequestToPendingQueue(RunnerId runner_id)
}
}

bool RequestProcessor::shouldProcessCommittedRequest(const RequestForSession & committed_request, bool & found_in_pending_queue)
bool RequestProcessor::shouldProcessCommittedRequest(RequestForSession & committed_request, bool & found_in_pending_queue)
{
bool has_read_request = false;
bool found_error = false;
Expand Down Expand Up @@ -160,35 +160,34 @@ bool RequestProcessor::shouldProcessCommittedRequest(const RequestForSession & c
if (first_pending_request.request->xid == committed_request.request->xid)
{
found_in_pending_queue = true;
committed_request.create_time = first_pending_request.create_time;
JackyWoo marked this conversation as resolved.
Show resolved Hide resolved
std::unique_lock lk(mutex);
if (error_request_ids.contains(first_pending_request.getRequestId()))
{
LOG_WARNING(log, "Request {} is in errors, but is successfully committed", committed_request.toSimpleString());
}
return true;
}

found_in_pending_queue = false;
/// Session of the previous committed(write) request is not same with the current,
/// which means a write_request(session_1) -> request(session_2) sequence.
if (first_pending_request.request->isReadRequest())
{
LOG_DEBUG(log, "Found read request, We should terminate the processing of committed(write) requests.");
has_read_request = true;
}
else
{
found_in_pending_queue = false;
/// Session of the previous committed(write) request is not same with the current,
/// which means a write_request(session_1) -> request(session_2) sequence.
if (first_pending_request.request->isReadRequest())
{
LOG_DEBUG(log, "Found read request, We should terminate the processing of committed(write) requests.");
has_read_request = true;
std::unique_lock lk(mutex);
found_error = error_request_ids.contains(first_pending_request.getRequestId());
}
else
{
{
std::unique_lock lk(mutex);
found_error = error_request_ids.contains(first_pending_request.getRequestId());
}

if (found_error)
LOG_WARNING(log, "Found error request, We should terminate the processing of committed(write) requests.");
else
process_not_in_pending_queue();
}
if (found_error)
LOG_WARNING(log, "Found error request, We should terminate the processing of committed(write) requests.");
else
process_not_in_pending_queue();
}

return !has_read_request && !found_error;
Expand Down Expand Up @@ -247,8 +246,7 @@ void RequestProcessor::processCommittedRequest(size_t count)
/// apply request
applyRequest(committed_request);
committed_queue.pop();
auto current_time = getCurrentTimeMilliseconds();
Metrics::getMetrics().update_latency->add(current_time - committed_request.create_time);
Metrics::getMetrics().update_latency->add(getCurrentTimeMilliseconds() - committed_request.create_time);

/// remove request from pending queue
auto & pending_requests_for_session = my_pending_requests[committed_request.session_id];
Expand Down
2 changes: 1 addition & 1 deletion src/Service/RequestProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RequestProcessor
/// We can handle zxid as a continuous stream of committed(write) requests at once.
/// However, if we encounter a read request or an erroneous request,
/// we need to interrupt the processing.
bool shouldProcessCommittedRequest(const RequestForSession & committed_request, bool & found_in_pending_queue);
bool shouldProcessCommittedRequest(RequestForSession & committed_request, bool & found_in_pending_queue);

using RequestForSessions = std::vector<RequestForSession>;

Expand Down
4 changes: 2 additions & 2 deletions src/Service/tests/gtest_raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ TEST(RaftStateMachine, serializeAndParse)
request->acls = default_acls;
session_request.request = request;

using namespace std::chrono;
session_request.create_time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
session_request.create_time = getCurrentTimeMilliseconds();
session_request.process_time = getCurrentWallTimeMilliseconds();

ptr<buffer> buf = serializeKeeperRequest(session_request);
ptr<RequestForSession> session_request_2 = deserializeKeeperRequest(*(buf.get()));
Expand Down
12 changes: 6 additions & 6 deletions src/Service/tests/raft_test_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ void createZNodeLog(NuRaftStateMachine & machine, const String & key, const Stri
request->acls = default_acls;
request->xid = 1;

using namespace std::chrono;
session_request.create_time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
session_request.create_time = getCurrentTimeMilliseconds();
session_request.process_time = getCurrentWallTimeMilliseconds();

ptr<buffer> buf = serializeKeeperRequest(session_request);
//LOG_INFO(log, "index {}", index);
Expand Down Expand Up @@ -197,8 +197,8 @@ void setZNode(NuRaftStateMachine & machine, const String & key, const String & d
//request->is_sequential = false;
//request->acls = default_acls;

using namespace std::chrono;
session_request.create_time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
session_request.create_time = getCurrentTimeMilliseconds();
session_request.process_time = getCurrentWallTimeMilliseconds();

ptr<buffer> buf = serializeKeeperRequest(session_request);
machine.commit(index, *(buf.get()));
Expand All @@ -220,8 +220,8 @@ void removeZNode(NuRaftStateMachine & machine, const String & key)
session_request.request = request;
request->path = key;

using namespace std::chrono;
session_request.create_time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
session_request.create_time = getCurrentTimeMilliseconds();
session_request.process_time = getCurrentWallTimeMilliseconds();

ptr<buffer> buf = serializeKeeperRequest(session_request);
machine.commit(index, *(buf.get()));
Expand Down
Loading