Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support on close callbacks for C++ client #4417

Merged
merged 8 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp-client/deephaven/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ add_compile_options(-Wall -Werror -Wno-deprecated-declarations)

add_subdirectory(dhclient)
add_subdirectory(dhcore)
add_subdirectory(tests EXCLUDE_FROM_ALL)
add_subdirectory(examples EXCLUDE_FROM_ALL)
add_subdirectory(tests)
add_subdirectory(examples)

install(DIRECTORY dhclient/include/public/
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#pragma once

#include <memory>
#include <mutex>
#include "deephaven/client/utility/misc_types.h"
#include "deephaven/client/impl/table_handle_manager_impl.h"
#include "deephaven/client/server/server.h"
#include "deephaven/client/utility/executor.h"
Expand All @@ -27,14 +29,23 @@ class ClientImpl {
ClientImpl(Private, std::shared_ptr<TableHandleManagerImpl> &&manager_impl);
~ClientImpl();

void Shutdown() {
managerImpl_->Shutdown();
}
void Shutdown();

[[nodiscard]]
const std::shared_ptr<TableHandleManagerImpl> &ManagerImpl() const { return managerImpl_; }
const std::shared_ptr<TableHandleManagerImpl> &ManagerImpl() const { return manager_impl_; }

using OnCloseCbId = utility::OnCloseCbId;
using OnCloseCb = utility::OnCloseCb;

OnCloseCbId AddOnCloseCallback(OnCloseCb cb);
bool RemoveOnCloseCallback(OnCloseCbId cb_id);

private:
std::shared_ptr<TableHandleManagerImpl> managerImpl_;
std::shared_ptr<TableHandleManagerImpl> manager_impl_;
struct {
mutable std::mutex mux;
std::uint32_t next_id;
std::map<OnCloseCbId, OnCloseCb> map;
} on_close_;
};
} // namespace deephaven::client::impl
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,24 @@ class Client {
[[nodiscard]]
TableHandleManager GetManager() const;

using OnCloseCbId = utility::OnCloseCbId;
using OnCloseCb = utility::OnCloseCb;

/**
* Adds a callback to be invoked when this client is closed.
*
* @param cb the callback
* @return an id for the added callback that can be used to remove it.
*/
OnCloseCbId AddOnCloseCallback(OnCloseCb cb);

/**
* Removes an on close callback.
* @param cb_id the id of the callback to remove
* @return true if a callback with that id was found and removed, false otherwise.
*/
bool RemoveOnCloseCallback(OnCloseCbId cb_id);

private:
explicit Client(std::shared_ptr<impl::ClientImpl> impl);
std::shared_ptr<impl::ClientImpl> impl_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#pragma once

#include <chrono>
#include <cstdint>
#include <functional>
#include <string>
#include <variant>

Expand All @@ -20,4 +22,20 @@ using TimePointSpecifier = std::variant<std::chrono::system_clock::time_point, i
* string.
*/
using DurationSpecifier = std::variant<std::chrono::nanoseconds, int64_t, std::string>;

/**
* Used to identify OnClose callbacks, eg, allowing their removal after addition.
*/
struct OnCloseCbId {
std::uint32_t id;
friend bool operator<(const OnCloseCbId lhs, const OnCloseCbId rhs) {
return lhs.id < rhs.id;
}
};

/**
* An OnClose callback.
*/
using OnCloseCb = std::function<void()>;

} // namespace deephaven::client::utility
41 changes: 21 additions & 20 deletions cpp-client/deephaven/dhclient/src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
*/
#include "deephaven/client/client.h"

#include <grpc/support/log.h>
#include <stdexcept>

#include <arrow/array.h>
#include <arrow/scalar.h>
#include "deephaven/client/columns.h"
Expand All @@ -15,47 +16,32 @@
#include "deephaven/client/impl/table_handle_impl.h"
#include "deephaven/client/impl/table_handle_manager_impl.h"
#include "deephaven/client/impl/update_by_operation_impl.h"
#include "deephaven/client/impl/util.h"
#include "deephaven/client/subscription/subscription_handle.h"
#include "deephaven/client/utility/arrow_util.h"
#include "deephaven/dhcore/clienttable/schema.h"
#include "deephaven/dhcore/utility/utility.h"
#include "deephaven/proto/table.pb.h"
#include "deephaven/proto/table.grpc.pb.h"

using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReader;
using io::deephaven::proto::backplane::grpc::ComboAggregateRequest;
using io::deephaven::proto::backplane::grpc::HandshakeRequest;
using io::deephaven::proto::backplane::grpc::HandshakeResponse;
using io::deephaven::proto::backplane::grpc::Ticket;
using deephaven::client::server::Server;
using deephaven::client::Column;
using deephaven::client::DateTimeCol;
using deephaven::client::NumCol;
using deephaven::client::StrCol;
using deephaven::client::impl::StrColImpl;
using deephaven::client::impl::AggregateComboImpl;
using deephaven::client::impl::AggregateImpl;
using deephaven::client::impl::ClientImpl;
using deephaven::client::impl::MoveVectorData;
using deephaven::client::impl::UpdateByOperationImpl;
using deephaven::client::subscription::SubscriptionHandle;
using deephaven::client::utility::Executor;
using deephaven::client::utility::OkOrThrow;
using deephaven::dhcore::clienttable::Schema;
using deephaven::dhcore::utility::Base64Encode;
using deephaven::dhcore::utility::MakeReservedVector;
using deephaven::dhcore::utility::separatedList;
using deephaven::dhcore::utility::SFCallback;
using deephaven::dhcore::utility::SimpleOstringstream;
using deephaven::dhcore::utility::Stringf;


namespace deephaven::client {
namespace {
void printTableData(std::ostream &s, const TableHandle &table_handle, bool want_headers);
void PrintTableData(std::ostream &s, const TableHandle &table_handle, bool want_headers);
void CheckNotClosedOrThrow(const std::shared_ptr<ClientImpl> &impl);
} // namespace

Client Client::Connect(const std::string &target, const ClientOptions &options) {
Expand Down Expand Up @@ -89,9 +75,19 @@ void Client::Close() {
}

TableHandleManager Client::GetManager() const {
CheckNotClosedOrThrow(impl_);
return TableHandleManager(impl_->ManagerImpl());
}

Client::OnCloseCbId Client::AddOnCloseCallback(std::function<void()> cb) {
CheckNotClosedOrThrow(impl_);
return impl_->AddOnCloseCallback(std::move(cb));
}

bool Client::RemoveOnCloseCallback(OnCloseCbId cb_id) {
CheckNotClosedOrThrow(impl_);
return impl_->RemoveOnCloseCallback(std::move(cb_id));
}

TableHandleManager::TableHandleManager() = default;
TableHandleManager::TableHandleManager(std::shared_ptr<impl::TableHandleManagerImpl> impl) : impl_(std::move(impl)) {}
Expand Down Expand Up @@ -619,7 +615,7 @@ TableHandleStreamAdaptor::TableHandleStreamAdaptor(TableHandle table, bool want_
TableHandleStreamAdaptor::~TableHandleStreamAdaptor() = default;

std::ostream &operator<<(std::ostream &s, const TableHandleStreamAdaptor &o) {
printTableData(s, o.table_, o.wantHeaders_);
PrintTableData(s, o.table_, o.wantHeaders_);
return s;
}

Expand All @@ -632,7 +628,7 @@ std::string ConvertToString::ToString(
} // namespace internal

namespace {
void printTableData(std::ostream &s, const TableHandle &table_handle, bool want_headers) {
void PrintTableData(std::ostream &s, const TableHandle &table_handle, bool want_headers) {
auto fsr = table_handle.GetFlightStreamReader();

if (want_headers) {
Expand Down Expand Up @@ -665,5 +661,10 @@ void printTableData(std::ostream &s, const TableHandle &table_handle, bool want_
}
}
}
void CheckNotClosedOrThrow(const std::shared_ptr<ClientImpl> &impl) {
if (impl == nullptr) {
throw std::runtime_error(DEEPHAVEN_LOCATION_STR("client is already closed"));
}
}
} // namespace
} // namespace deephaven::client
33 changes: 27 additions & 6 deletions cpp-client/deephaven/dhclient/src/impl/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@
#include "deephaven/client/impl/client_impl.h"

#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include "deephaven/client/impl/table_handle_manager_impl.h"
#include "deephaven/dhcore/utility/callbacks.h"

using io::deephaven::proto::backplane::grpc::HandshakeResponse;
using io::deephaven::proto::backplane::grpc::Ticket;
using io::deephaven::proto::backplane::script::grpc::StartConsoleResponse;

using deephaven::client::impl::TableHandleManagerImpl;
using deephaven::client::server::Server;
using deephaven::client::utility::Executor;
using deephaven::dhcore::utility::SFCallback;

namespace deephaven::client {
namespace impl {
Expand All @@ -25,25 +24,47 @@ std::shared_ptr<ClientImpl> ClientImpl::Create(
std::shared_ptr<Executor> executor,
std::shared_ptr<Executor> flight_executor,
const std::string &session_type) {
std::optional<Ticket> consoleTicket;
std::optional<Ticket> console_ticket;
if (!session_type.empty()) {
auto cb = SFCallback<StartConsoleResponse>::CreateForFuture();
server->StartConsoleAsync(session_type, std::move(cb.first));
StartConsoleResponse scr = std::move(std::get<0>(cb.second.get()));
consoleTicket = std::move(*scr.mutable_result_id());
console_ticket = std::move(*scr.mutable_result_id());
}

auto thmi = TableHandleManagerImpl::Create(
std::move(consoleTicket),
std::move(console_ticket),
std::move(server),
std::move(executor),
std::move(flight_executor));
return std::make_shared<ClientImpl>(Private(), std::move(thmi));
}

ClientImpl::ClientImpl(Private, std::shared_ptr<TableHandleManagerImpl> &&manager_impl) :
managerImpl_(std::move(manager_impl)) {}
manager_impl_(std::move(manager_impl)) {}

ClientImpl::~ClientImpl() = default;

ClientImpl::OnCloseCbId ClientImpl::AddOnCloseCallback(OnCloseCb cb) {
std::unique_lock lock(on_close_.mux);
OnCloseCbId id({on_close_.next_id++});
on_close_.map[id] = std::move(cb);
return id;
}

bool ClientImpl::RemoveOnCloseCallback(OnCloseCbId cb_id) {
std::unique_lock lock(on_close_.mux);
return on_close_.map.erase(std::move(cb_id)) > 0;
}

void ClientImpl::Shutdown() {
manager_impl_->Shutdown();
std::unique_lock lock(on_close_.mux);
auto map = std::move(on_close_.map);
lock.unlock();
for (const auto &entry : map) {
entry.second();
}
}
} // namespace impl
} // namespace deephaven::client
1 change: 1 addition & 0 deletions cpp-client/deephaven/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ add_executable(tests
main.cc
merge_tables_test.cc
new_table_test.cc
on_close_cb_test.cc
script_test.cc
select_test.cc
sort_test.cc
Expand Down
26 changes: 26 additions & 0 deletions cpp-client/deephaven/tests/on_close_cb_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
#include <iostream>
#include "tests/third_party/catch.hpp"
#include "tests/test_util.h"
#include "deephaven/client/client.h"

using deephaven::client::Client;

namespace deephaven::client::tests {
TEST_CASE("On Close Callbacks can be added and removed and are executed", "[simple]") {
auto client = TableMakerForTests::CreateClient();
bool cb_1_called = false;
auto cb_1 = [&cb_1_called]{ cb_1_called = true; };
bool cb_2_called = false;
auto cb_2 = [&cb_2_called]{ cb_2_called = true; };
const auto id_1 = client.AddOnCloseCallback(cb_1);
(void) client.AddOnCloseCallback(cb_2);
const bool removed = client.RemoveOnCloseCallback(id_1);
CHECK(removed);
client.Close();
CHECK(!cb_1_called);
CHECK(cb_2_called);
}
}