Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: temporal KV client index_range API via gRPC #2189

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 114 additions & 1 deletion cmd/dev/grpc_toolbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <silkworm/core/common/bytes_to_string.hpp>
#include <silkworm/core/common/util.hpp>
#include <silkworm/core/types/address.hpp>
#include <silkworm/db/kv/api/state_cache.hpp>
#include <silkworm/db/kv/grpc/client/remote_client.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/grpc/client/client_context_pool.hpp>
#include <silkworm/infra/grpc/common/util.hpp>
Expand All @@ -40,6 +42,7 @@
#include <silkworm/interfaces/types/types.pb.h>
#include <silkworm/rpc/common/constants.hpp>
#include <silkworm/rpc/ethbackend/remote_backend.hpp>
#include <silkworm/rpc/ethdb/kv/backend_providers.hpp>

using namespace silkworm;
using namespace silkworm::rpc;
Expand Down Expand Up @@ -691,6 +694,7 @@ ABSL_FLAG(std::string, tool, "", "gRPC remote interface tool name as string");
ABSL_FLAG(std::string, target, kDefaultPrivateApiAddr, "Silkworm location as string <address>:<port>");
ABSL_FLAG(std::string, table, "", "database table name as string");
ABSL_FLAG(uint32_t, timeout, kDefaultTimeout.count(), "gRPC call timeout as integer");
ABSL_FLAG(bool, verbose, false, "verbose output");

int ethbackend_async() {
auto target{absl::GetFlag(FLAGS_target)};
Expand Down Expand Up @@ -841,6 +845,111 @@ int kv_seek() {
return kv_seek(target, table_name, key_bytes.value());
}

Task<void> kv_index_range_query(const std::shared_ptr<db::kv::api::Service>& kv_service,
db::kv::api::IndexRangeQuery&& query,
const bool verbose) {
try {
auto tx = co_await kv_service->begin_transaction();
std::cout << "KV IndexRange -> " << query.table << "\n";
auto paginated_result = co_await tx->index_range(std::move(query));
auto it = co_await paginated_result.begin();
std::cout << "KV IndexRange <- #timestamps: ";
int count{0};
db::kv::api::ListOfTimestamp timestamps;
while (it != paginated_result.end()) {
timestamps.emplace_back(*it);
++count;
co_await ++it;
}
std::cout << count << "\n";
if (verbose) {
for (const auto ts : timestamps) {
std::cout << ts << "\n";
}
}
co_await tx->close();
} catch (const std::exception& e) {
std::cout << "KV IndexRange <- error: " << e.what() << "\n";
}
}

int kv_index_range_coro(const std::string& target, const std::string& table, const Bytes& key, const bool verbose) {
try {
ClientContextPool context_pool{1};
auto& context = context_pool.next_context();
auto io_context = context.io_context();
auto grpc_context = context.grpc_context();

boost::asio::signal_set signals(*io_context, SIGINT, SIGTERM);
signals.async_wait([&](const boost::system::error_code& error, int signal_number) {
std::cout << "Signal caught, error: " << error.message() << " number: " << signal_number << std::endl
<< std::flush;
context_pool.stop();
});

auto channel_factory = [target]() -> std::shared_ptr<::grpc::Channel> {
return ::grpc::CreateChannel(target, grpc::InsecureChannelCredentials());
};

// ETHBACKEND
ethbackend::RemoteBackEnd eth_backend{*io_context, channel_factory(), *grpc_context};
// DB KV API client
db::kv::api::CoherentStateCache state_cache;
db::kv::grpc::client::RemoteClient client{channel_factory,
*grpc_context,
&state_cache,
rpc::ethdb::kv::block_provider(&eth_backend),
rpc::ethdb::kv::block_number_from_txn_hash_provider(&eth_backend)};
auto kv_service = client.service();
db::kv::api::IndexRangeQuery query{
.table = table,
.key = key,
.from_timestamp = 0,
.to_timestamp = -1,
.ascending_order = true,
};
// NOLINTNEXTLINE(performance-unnecessary-value-param)
boost::asio::co_spawn(*io_context, kv_index_range_query(kv_service, std::move(query), verbose), [&](std::exception_ptr) {
context_pool.stop();
});

context_pool.run();
} catch (const std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
} catch (...) {
std::cerr << "Unexpected exception\n";
}
return 0;
}

int kv_index_range() {
const auto target{absl::GetFlag(FLAGS_target)};
if (target.empty() || !absl::StrContains(target, ":")) {
std::cerr << "Parameter target is invalid: [" << target << "]\n";
std::cerr << "Use --target flag to specify the location of Erigon running instance\n";
return -1;
}

const auto table_name{absl::GetFlag(FLAGS_table)};
if (table_name.empty()) {
std::cerr << "Parameter table is invalid: [" << table_name << "]\n";
std::cerr << "Use --table flag to specify the name of Erigon database table\n";
return -1;
}

const auto key{absl::GetFlag(FLAGS_key)};
const auto key_bytes = silkworm::from_hex(key);
if (key.empty() || !key_bytes.has_value()) {
std::cerr << "Parameter key is invalid: [" << key << "]\n";
std::cerr << "Use --key flag to specify the key in key-value dupsort table\n";
return -1;
}

const auto verbose{absl::GetFlag(FLAGS_verbose)};

return kv_index_range_coro(target, table_name, *key_bytes, verbose);
}

int main(int argc, char* argv[]) {
absl::SetProgramUsageMessage(
"Execute specified internal gRPC I/F tool:\n"
Expand All @@ -850,7 +959,8 @@ int main(int argc, char* argv[]) {
"\tkv_seek\t\t\t\tquery using SEEK the Erigon/Silkworm Key-Value (KV) remote interface to database\n"
"\tkv_seek_async\t\t\tquery using SEEK the Erigon/Silkworm Key-Value (KV) remote interface to database\n"
"\tkv_seek_async_callback\t\tquery using SEEK the Erigon/Silkworm Key-Value (KV) remote interface to database\n"
"\tkv_seek_both\t\t\tquery using SEEK_BOTH the Erigon/Silkworm Key-Value (KV) remote interface to database\n");
"\tkv_seek_both\t\t\tquery using SEEK_BOTH the Erigon/Silkworm Key-Value (KV) remote interface to database\n"
"\tkv_index_range\t\tquery using INDEX_RANGE the Erigon/Silkworm Key-Value (KV) remote interface to database\n");
const auto positional_args = absl::ParseCommandLine(argc, argv);
if (positional_args.size() < 2) {
std::cerr << "No gRPC tool specified as first positional argument\n\n";
Expand Down Expand Up @@ -882,6 +992,9 @@ int main(int argc, char* argv[]) {
if (tool == "kv_seek") {
return kv_seek();
}
if (tool == "kv_index_range") {
return kv_index_range();
}

std::cerr << "Unknown tool " << tool << " specified as first argument\n\n";
std::cerr << absl::ProgramUsageMessage();
Expand Down
6 changes: 0 additions & 6 deletions silkworm/db/kv/api/direct_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ Task<DomainPointResult> DirectService::get_domain(const DomainPointQuery&) {

/** Temporal Range Queries **/

// rpc IndexRange(IndexRangeReq) returns (IndexRangeReply);
Task<IndexRangeResult> DirectService::get_index_range(const IndexRangeQuery&) {
// TODO(canepat) implement
co_return IndexRangeResult{};
}

// rpc HistoryRange(HistoryRangeReq) returns (Pairs);
Task<HistoryRangeResult> DirectService::get_history_range(const HistoryRangeQuery&) {
// TODO(canepat) implement
Expand Down
3 changes: 0 additions & 3 deletions silkworm/db/kv/api/direct_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ class DirectService : public Service {

/** Temporal Range Queries **/

// rpc IndexRange(IndexRangeReq) returns (IndexRangeReply);
Task<IndexRangeResult> get_index_range(const IndexRangeQuery&) override;

// rpc HistoryRange(HistoryRangeReq) returns (Pairs);
Task<HistoryRangeResult> get_history_range(const HistoryRangeQuery&) override;

Expand Down
7 changes: 7 additions & 0 deletions silkworm/db/kv/api/endpoint/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#pragma once

#include <cstdint>
#include <string>
#include <utility>
#include <vector>

#include <silkworm/core/common/bytes.hpp>
Expand All @@ -25,8 +27,13 @@ namespace silkworm::db::kv::api {

using TxId = uint64_t;
using Timestamp = int64_t;
using TimestampRange = std::pair<Timestamp, Timestamp>;

using ListOfBytes = std::vector<Bytes>;
using ListOfTimestamp = std::vector<Timestamp>;

using Domain = uint16_t;
using History = std::string_view;
using InvertedIndex = std::string_view;

} // namespace silkworm::db::kv::api
12 changes: 9 additions & 3 deletions silkworm/db/kv/api/endpoint/temporal_range.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@
#include <silkworm/core/common/bytes.hpp>

#include "common.hpp"
#include "paginated_sequence.hpp"

namespace silkworm::db::kv::api {

//! Unlimited range size in range queries
inline constexpr int64_t kUnlimited{-1};

struct IndexRangeQuery {
TxId tx_id{0};
std::string table;
Bytes key;
Timestamp from_timestamp;
Timestamp to_timestamp;
bool ascending_order{false};
uint64_t limit{0};
int64_t limit{kUnlimited};
uint32_t page_size{0};
std::string page_token;
};
Expand All @@ -54,7 +58,7 @@ struct HistoryRangeQuery {
Timestamp from_timestamp;
Timestamp to_timestamp;
bool ascending_order{false};
uint64_t limit{0};
int64_t limit{kUnlimited};
uint32_t page_size{0};
std::string page_token;
};
Expand All @@ -67,11 +71,13 @@ struct DomainRangeQuery {
Bytes from_key;
Bytes to_key;
bool ascending_order{false};
uint64_t limit{0};
int64_t limit{kUnlimited};
uint32_t page_size{0};
std::string page_token;
};

using DomainRangeResult = RangeResult;

using PaginatedTimestamps = PaginatedSequence<Timestamp>;

} // namespace silkworm::db::kv::api
8 changes: 8 additions & 0 deletions silkworm/db/kv/api/local_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,12 @@ std::shared_ptr<chain::ChainStorage> LocalTransaction::create_storage() {
return std::make_shared<chain::LocalChainStorage>(txn_);
}

Task<PaginatedTimestamps> LocalTransaction::index_range(api::IndexRangeQuery&& /*query*/) {
// TODO(canepat) implement using E3-like aggregator abstraction [tx_id_ must be changed]
auto paginator = []() mutable -> Task<api::PaginatedTimestamps::PageResult> {
co_return api::PaginatedTimestamps::PageResult{};
};
co_return api::PaginatedTimestamps{std::move(paginator)};
}

} // namespace silkworm::db::kv::api
3 changes: 3 additions & 0 deletions silkworm/db/kv/api/local_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class LocalTransaction : public BaseTransaction {

Task<void> close() override;

// rpc IndexRange(IndexRangeReq) returns (IndexRangeReply);
Task<PaginatedTimestamps> index_range(IndexRangeQuery&& query) override;

private:
Task<std::shared_ptr<CursorDupSort>> get_cursor(const std::string& table, bool is_cursor_dup_sort);

Expand Down
3 changes: 0 additions & 3 deletions silkworm/db/kv/api/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ struct Service {

/** Temporal Range Queries **/

// rpc IndexRange(IndexRangeReq) returns (IndexRangeReply);
virtual Task<IndexRangeResult> get_index_range(const IndexRangeQuery&) = 0;

// rpc HistoryRange(HistoryRangeReq) returns (Pairs);
virtual Task<HistoryRangeResult> get_history_range(const HistoryRangeQuery&) = 0;

Expand Down
6 changes: 6 additions & 0 deletions silkworm/db/kv/api/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "cursor.hpp"
#include "endpoint/key_value.hpp"
#include "endpoint/temporal_range.hpp"

namespace silkworm::db::kv::api {

Expand Down Expand Up @@ -63,6 +64,11 @@ class Transaction {
virtual Task<Bytes> get_one(const std::string& table, ByteView key) = 0;

virtual Task<std::optional<Bytes>> get_both_range(const std::string& table, ByteView key, ByteView subkey) = 0;

/** Temporal Range Queries **/

// rpc IndexRange(IndexRangeReq) returns (IndexRangeReply);
virtual Task<PaginatedTimestamps> index_range(IndexRangeQuery&& query) = 0;
};

} // namespace silkworm::db::kv::api
8 changes: 4 additions & 4 deletions silkworm/db/kv/grpc/client/endpoint/temporal_range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ proto::IndexRangeReq index_range_request_from_query(const api::IndexRangeQuery&
proto::IndexRangeReq request;
request.set_tx_id(query.tx_id);
request.set_table(query.table);
request.set_k(to_hex(query.key));
request.set_k(query.key.data(), query.key.size());
request.set_from_ts(query.from_timestamp);
request.set_to_ts(query.to_timestamp);
request.set_order_ascend(query.ascending_order);
request.set_limit(static_cast<int64_t>(query.limit));
request.set_limit(query.limit);
request.set_page_size(static_cast<int32_t>(query.page_size));
request.set_page_token(query.page_token);
return request;
Expand Down Expand Up @@ -75,8 +75,8 @@ ::remote::DomainRangeReq domain_range_request_from_query(const api::DomainRangeQ
::remote::DomainRangeReq request;
request.set_tx_id(query.tx_id);
request.set_table(query.table);
request.set_from_key(to_hex(query.from_key));
request.set_to_key(to_hex(query.to_key));
request.set_from_key(query.from_key.data(), query.from_key.size());
request.set_to_key(query.to_key.data(), query.to_key.size());
request.set_order_ascend(query.ascending_order);
request.set_limit(static_cast<int64_t>(query.limit));
request.set_page_size(static_cast<int32_t>(query.page_size));
Expand Down
6 changes: 3 additions & 3 deletions silkworm/db/kv/grpc/client/endpoint/temporal_range_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace proto = ::remote;

TEST_CASE("index_range_request_from_query", "[node][remote][kv][grpc]") {
const Fixtures<api::IndexRangeQuery, proto::IndexRangeReq> fixtures{
{{}, {}},
{{}, default_proto_index_range_request()},
{sample_index_range_query(), sample_proto_index_range_request()},
};
for (const auto& [query, expected_range_request] : fixtures) {
Expand Down Expand Up @@ -68,7 +68,7 @@ TEST_CASE("index_range_result_from_response", "[node][remote][kv][grpc]") {

TEST_CASE("history_range_request_from_query", "[node][remote][kv][grpc]") {
const Fixtures<api::HistoryRangeQuery, proto::HistoryRangeReq> fixtures{
{{}, {}},
{{}, default_proto_history_range_request()},
{sample_history_range_query(), sample_proto_history_range_request()},
};
for (const auto& [query, expected_range_request] : fixtures) {
Expand Down Expand Up @@ -105,7 +105,7 @@ TEST_CASE("history_range_result_from_response", "[node][remote][kv][grpc]") {

TEST_CASE("domain_range_request_from_query", "[node][remote][kv][grpc]") {
const Fixtures<api::DomainRangeQuery, proto::DomainRangeReq> fixtures{
{{}, {}},
{{}, default_proto_domain_range_request()},
{sample_domain_range_query(), sample_proto_domain_range_request()},
};
for (const auto& [query, expected_range_request] : fixtures) {
Expand Down
Loading
Loading