From 76d9b41005412ff00b194e9cc288ab71090c11d9 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 21 Oct 2023 15:46:16 +0800 Subject: [PATCH] `reconnect` cannot reconnect when you are out of authority --- demo/win_service/client/main.cpp | 17 ++++++- src/libipc/ipc.cpp | 72 ++++++++++++++++++----------- src/libipc/platform/win/shm_win.cpp | 17 ++++--- src/libipc/queue.h | 8 +++- 4 files changed, 78 insertions(+), 36 deletions(-) diff --git a/demo/win_service/client/main.cpp b/demo/win_service/client/main.cpp index 5e62d2c4..44a9ba62 100644 --- a/demo/win_service/client/main.cpp +++ b/demo/win_service/client/main.cpp @@ -11,14 +11,27 @@ int _tmain (int argc, TCHAR *argv[]) { ipc::channel ipc_r{ipc::prefix{"Global\\"}, "service ipc r", ipc::receiver}; ipc::channel ipc_w{ipc::prefix{"Global\\"}, "service ipc w", ipc::sender}; while (1) { + if (!ipc_r.reconnect(ipc::receiver)) { + Sleep(1000); + continue; + } auto msg = ipc_r.recv(); if (msg.empty()) { _tprintf(_T("My Sample Client: message recv error\n")); - return -1; + ipc_r.disconnect(); + continue; } printf("My Sample Client: message recv: [%s]\n", (char const *)msg.data()); - while (!ipc_w.send("Copy.")) { + for (;;) { + if (!ipc_w.reconnect(ipc::sender)) { + Sleep(1000); + continue; + } + if (ipc_w.send("Copy.")) { + break; + } _tprintf(_T("My Sample Client: message send error\n")); + ipc_w.disconnect(); Sleep(1000); } _tprintf(_T("My Sample Client: message send [Copy]\n")); diff --git a/src/libipc/ipc.cpp b/src/libipc/ipc.cpp index 38e3f1d7..6b6432e1 100755 --- a/src/libipc/ipc.cpp +++ b/src/libipc/ipc.cpp @@ -112,16 +112,27 @@ struct conn_info_head { ipc::shm::handle acc_h_; conn_info_head(char const * prefix, char const * name) - : prefix_ {ipc::make_string(prefix)} - , name_ {ipc::make_string(name)} - , cc_id_ {} - , cc_waiter_{ipc::make_prefix(prefix_, {"CC_CONN__", name_}).c_str()} - , wt_waiter_{ipc::make_prefix(prefix_, {"WT_CONN__", name_}).c_str()} - , rd_waiter_{ipc::make_prefix(prefix_, {"RD_CONN__", name_}).c_str()} - , acc_h_ {ipc::make_prefix(prefix_, {"AC_CONN__", name_}).c_str(), sizeof(acc_t)} { + : prefix_{ipc::make_string(prefix)} + , name_ {ipc::make_string(name)} + , cc_id_ {} {} + + void init() { + if (!cc_waiter_.valid()) cc_waiter_.open(ipc::make_prefix(prefix_, {"CC_CONN__", name_}).c_str()); + if (!wt_waiter_.valid()) wt_waiter_.open(ipc::make_prefix(prefix_, {"WT_CONN__", name_}).c_str()); + if (!rd_waiter_.valid()) rd_waiter_.open(ipc::make_prefix(prefix_, {"RD_CONN__", name_}).c_str()); + if (!acc_h_.valid()) acc_h_.acquire(ipc::make_prefix(prefix_, {"AC_CONN__", name_}).c_str(), sizeof(acc_t)); + if (cc_id_ != 0) { + return; + } acc_t *pacc = cc_acc(prefix_); - if (pacc != nullptr) { - cc_id_ = pacc->fetch_add(1, std::memory_order_relaxed); + if (pacc == nullptr) { + // Failed to obtain the global accumulator. + return; + } + cc_id_ = pacc->fetch_add(1, std::memory_order_relaxed) + 1; + if (cc_id_ == 0) { + // The identity cannot be 0. + cc_id_ = pacc->fetch_add(1, std::memory_order_relaxed) + 1; } } @@ -362,12 +373,18 @@ struct queue_generator { queue_t que_; conn_info_t(char const * pref, char const * name) - : conn_info_head{pref, name} - , que_{ipc::make_prefix(prefix_, { - "QU_CONN__", - ipc::to_string(DataSize), "__", - ipc::to_string(AlignSize), "__", - name}).c_str()} {} + : conn_info_head{pref, name} { init(); } + + void init() { + conn_info_head::init(); + if (!que_.valid()) { + que_.open(ipc::make_prefix(prefix_, { + "QU_CONN__", + ipc::to_string(DataSize), "__", + ipc::to_string(AlignSize), "__", + this->name_}).c_str()); + } + } void disconnect_receiver() { bool dis = que_.disconnect(); @@ -397,6 +414,18 @@ constexpr static queue_t* queue_of(ipc::handle_t h) noexcept { /* API implementations */ +static bool connect(ipc::handle_t * ph, ipc::prefix pref, char const * name, bool start_to_recv) { + assert(ph != nullptr); + if (*ph == nullptr) { + *ph = ipc::mem::alloc(pref.str, name); + } + return reconnect(ph, start_to_recv); +} + +static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) { + return connect(ph, {nullptr}, name, start_to_recv); +} + static void disconnect(ipc::handle_t h) { auto que = queue_of(h); if (que == nullptr) { @@ -414,6 +443,7 @@ static bool reconnect(ipc::handle_t * ph, bool start_to_recv) { if (que == nullptr) { return false; } + info_of(*ph)->init(); if (start_to_recv) { que->shut_sending(); if (que->connect()) { // wouldn't connect twice @@ -429,18 +459,6 @@ static bool reconnect(ipc::handle_t * ph, bool start_to_recv) { return que->ready_sending(); } -static bool connect(ipc::handle_t * ph, ipc::prefix pref, char const * name, bool start_to_recv) { - assert(ph != nullptr); - if (*ph == nullptr) { - *ph = ipc::mem::alloc(pref.str, name); - } - return reconnect(ph, start_to_recv); -} - -static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) { - return connect(ph, {nullptr}, name, start_to_recv); -} - static void destroy(ipc::handle_t h) { disconnect(h); ipc::mem::free(info_of(h)); diff --git a/src/libipc/platform/win/shm_win.cpp b/src/libipc/platform/win/shm_win.cpp index 75c2a95c..4b7016ef 100755 --- a/src/libipc/platform/win/shm_win.cpp +++ b/src/libipc/platform/win/shm_win.cpp @@ -37,21 +37,26 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) { // Opens a named file mapping object. if (mode == open) { h = ::OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, fmt_name.c_str()); + if (h == NULL) { + ipc::error("fail OpenFileMapping[%d]: %s\n", static_cast(::GetLastError()), name); + return nullptr; + } } // Creates or opens a named file mapping object for a specified file. else { h = ::CreateFileMapping(INVALID_HANDLE_VALUE, detail::get_sa(), PAGE_READWRITE | SEC_COMMIT, 0, static_cast(size), fmt_name.c_str()); + DWORD err = ::GetLastError(); // If the object exists before the function call, the function returns a handle to the existing object // (with its current size, not the specified size), and GetLastError returns ERROR_ALREADY_EXISTS. - if ((mode == create) && (::GetLastError() == ERROR_ALREADY_EXISTS)) { - ::CloseHandle(h); + if ((mode == create) && (err == ERROR_ALREADY_EXISTS)) { + if (h != NULL) ::CloseHandle(h); h = NULL; } - } - if (h == NULL) { - ipc::error("fail CreateFileMapping/OpenFileMapping[%d]: %s\n", static_cast(::GetLastError()), name); - return nullptr; + if (h == NULL) { + ipc::error("fail CreateFileMapping[%d]: %s\n", static_cast(err), name); + return nullptr; + } } auto ii = mem::alloc(); ii->h_ = h; diff --git a/src/libipc/queue.h b/src/libipc/queue.h index bb925ce3..e38fe778 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -104,7 +104,7 @@ class queue_base : public queue_conn { explicit queue_base(char const * name) : queue_base{} { - elems_ = open(name); + elems_ = queue_conn::template open(name); } explicit queue_base(elems_t * elems) noexcept @@ -117,6 +117,12 @@ class queue_base : public queue_conn { base_t::close(); } + bool open(char const * name) noexcept { + base_t::close(); + elems_ = queue_conn::template open(name); + return elems_ != nullptr; + } + elems_t * elems() noexcept { return elems_; } elems_t const * elems() const noexcept { return elems_; }