Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watches #186

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
15 changes: 11 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ project(
LANGUAGES C CXX
VERSION ${LSKV_VERSION_SHORT})

option(
COMPILE_TARGET
"Compile target to build for, one of [virtual;sgx;snp], defaults to virtual"
virtual)
set(COMPILE_TARGET
"virtual"
CACHE STRING
"Compile target to build for, one of [virtual;sgx;snp], defaults to virtual"
)

set(CCF "ccf_${COMPILE_TARGET}")

Expand Down Expand Up @@ -42,6 +43,12 @@ option(VERBOSE_LOGGING "enable verbose logging" OFF)

add_compile_definitions(LSKV_VERSION="${LSKV_VERSION}")

# disable log deprecation warnings, we're using old versions of things that
# might not be fully compatible
add_compile_definitions(CCF_LOGGER_NO_DEPRECATE)
# Not sure why this is needed but it gets the protobuf stuff to build
add_compile_definitions(GOOGLE_PROTOBUF_INTERNAL_DONATE_STEAL_INLINE=0)

add_ccf_app(
lskv
SRCS
Expand Down
101 changes: 101 additions & 0 deletions ccf_watches.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
diff --git a/src/endpoints/grpc/grpc.h b/src/endpoints/grpc/grpc.h
index bf7829feb..c29fa47ad 100644
--- a/src/endpoints/grpc/grpc.h
+++ b/src/endpoints/grpc/grpc.h
@@ -68,22 +68,30 @@ namespace ccf::grpc
}
else
{
- const auto message_length = impl::read_message_frame(data, size);
- if (size != message_length)
+ In in;
+ try
{
- throw std::logic_error(fmt::format(
- "Error in gRPC frame: frame size is {} but messages is {} bytes",
- size,
- message_length));
- }
+ const auto message_length = impl::read_message_frame(data, size);
+ if (size != message_length)
+ {
+ throw std::logic_error(fmt::format(
+ "Error in gRPC frame: frame size is {} but messages is {} bytes",
+ size,
+ message_length));
+ }

- In in;
- if (!in.ParseFromArray(data, message_length))
+ if (!in.ParseFromArray(data, message_length))
+ {
+ throw std::logic_error(fmt::format(
+ "Error deserialising protobuf payload of type {}, size {}",
+ in.GetTypeName(),
+ size));
+ }
+ }
+ catch (const std::exception& e)
{
- throw std::logic_error(fmt::format(
- "Error deserialising protobuf payload of type {}, size {}",
- in.GetTypeName(),
- size));
+ // Note: Client streaming!
+ LOG_FAIL_FMT("Error deserialising payload: {}", e.what());
}
return in;
}
diff --git a/src/http/http2_callbacks.h b/src/http/http2_callbacks.h
index 45634231c..d7d46ca44 100644
--- a/src/http/http2_callbacks.h
+++ b/src/http/http2_callbacks.h
@@ -36,7 +36,7 @@ namespace http2
stream_data->outgoing.state == StreamResponseState::Streaming)
{
// Early out: when streaming, avoid calling this callback
- // repeatedly when there no data to read
+ // repeatedly when there is no data to read
return NGHTTP2_ERR_DEFERRED;
}

@@ -122,8 +122,20 @@ namespace http2
return 0;
}

+ auto& headers = stream_data->incoming.headers;
+ std::string url = {};
+ {
+ const auto url_it = headers.find(http2::headers::PATH);
+ if (url_it != headers.end())
+ {
+ url = url_it->second;
+ }
+ }
+
// If the request is complete, process it
- if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
+ if (
+ frame->hd.flags & NGHTTP2_FLAG_END_STREAM ||
+ url == "/etcdserverpb.Watch/Watch")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jumaffre I guess for lease keep alives (also bidi) we'd need to add that path here too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as long as client streaming isn't supported in CCF.

{
auto* p = get_parser(user_data);
p->handle_completed(stream_id, stream_data);
diff --git a/src/http/http2_parser.h b/src/http/http2_parser.h
index 4e5b91a33..a4b808b8a 100644
--- a/src/http/http2_parser.h
+++ b/src/http/http2_parser.h
@@ -387,8 +387,11 @@ namespace http2

if (stream_data->outgoing.state != StreamResponseState::Uninitialised)
{
- throw std::logic_error(fmt::format(
- "Stream {} should be uninitialised to start stream", stream_id));
+ // throw std::logic_error(fmt::format(
+ // "Stream {} should be uninitialised to start stream", stream_id));
+
+ stream_data->outgoing.state = StreamResponseState::Streaming;
+ return;
}

stream_data->outgoing.state = StreamResponseState::Streaming;
128 changes: 128 additions & 0 deletions proto/etcd.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,27 @@ message KeyValue
int64 lease = 6;
}

message Event {
enum EventType {
PUT = 0;
DELETE = 1;
}
// type is the kind of event. If type is a PUT, it indicates
// new data has been stored to the key. If type is a DELETE,
// it indicates the key was deleted.
EventType type = 1;
// kv holds the KeyValue for the event.
// A PUT event contains current kv pair.
// A PUT event with kv.Version=1 indicates the creation of a key.
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
KeyValue kv = 2;

// prev_kv holds the key-value pair before the event happens.
KeyValue prev_kv = 3;
}


message ResponseHeader
{
// option (versionpb.etcd_version_msg) = "3.0";
Expand Down Expand Up @@ -477,3 +498,110 @@ message StatusResponse
// delay in relationship to the target cluster version.
string storageVersion = 11;
}

message WatchRequest {
// option (versionpb.etcd_version_msg) = "3.0";
// request_union is a request to either create a new watcher or cancel an existing watcher.
oneof request_union {
WatchCreateRequest create_request = 1;
WatchCancelRequest cancel_request = 2;
WatchProgressRequest progress_request = 3 ;//[(versionpb.etcd_version_field)="3.4"];
}
}

message WatchCreateRequest {
// option (versionpb.etcd_version_msg) = "3.0";

// key is the key to register for watching.
bytes key = 1;

// range_end is the end of the range [key, range_end) to watch. If range_end is not given,
// only the key argument is watched. If range_end is equal to '\0', all keys greater than
// or equal to the key argument are watched.
// If the range_end is one bit larger than the given key,
// then all keys with the prefix (the given key) will be watched.
bytes range_end = 2;

// start_revision is an optional revision to watch from (inclusive). No start_revision is "now".
int64 start_revision = 3;

// progress_notify is set so that the etcd server will periodically send a WatchResponse with
// no events to the new watcher if there are no recent events. It is useful when clients
// wish to recover a disconnected watcher starting from a recent known revision.
// The etcd server may decide how often it will send notifications based on current load.
bool progress_notify = 4;

enum FilterType {
// option (versionpb.etcd_version_enum) = "3.1";

// filter out put event.
NOPUT = 0;
// filter out delete event.
NODELETE = 1;
}

// filters filter the events at server side before it sends back to the watcher.
repeated FilterType filters = 5 ;//[(versionpb.etcd_version_field)="3.1"];

// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6 ;//[(versionpb.etcd_version_field)="3.1"];

// If watch_id is provided and non-zero, it will be assigned to this watcher.
// Since creating a watcher in etcd is not a synchronous operation,
// this can be used ensure that ordering is correct when creating multiple
// watchers on the same stream. Creating a watcher with an ID already in
// use on the stream will cause an error to be returned.
int64 watch_id = 7 ;//[(versionpb.etcd_version_field)="3.4"];

// fragment enables splitting large revisions into multiple watch responses.
bool fragment = 8 ;//[(versionpb.etcd_version_field)="3.4"];
}

message WatchCancelRequest {
// option (versionpb.etcd_version_msg) = "3.1";
// watch_id is the watcher id to cancel so that no more events are transmitted.
int64 watch_id = 1 ;//[(versionpb.etcd_version_field)="3.1"];
}

// Requests the a watch stream progress status be sent in the watch response stream as soon as
// possible.
message WatchProgressRequest {
// option (versionpb.etcd_version_msg) = "3.4";
}

message WatchResponse {
// option (versionpb.etcd_version_msg) = "3.0";

ResponseHeader header = 1;
// watch_id is the ID of the watcher that corresponds to the response.
int64 watch_id = 2;

// created is set to true if the response is for a create watch request.
// The client should record the watch_id and expect to receive events for
// the created watcher from the same stream.
// All events sent to the created watcher will attach with the same watch_id.
bool created = 3;

// canceled is set to true if the response is for a cancel watch request.
// No further events will be sent to the canceled watcher.
bool canceled = 4;

// compact_revision is set to the minimum index if a watcher tries to watch
// at a compacted index.
//
// This happens when creating a watcher at a compacted revision or the watcher cannot
// catch up with the progress of the key-value store.
//
// The client should treat the watcher as canceled and should not try to create any
// watcher with the same start_revision again.
int64 compact_revision = 5;

// cancel_reason indicates the reason for canceling the watcher.
string cancel_reason = 6;// [(versionpb.etcd_version_field)="3.4"];

// framgment is true if large watch response was split over multiple responses.
bool fragment = 7 ;//[(versionpb.etcd_version_field)="3.4"];

repeated Event events = 11;
}
91 changes: 90 additions & 1 deletion src/app/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
#include "ccf/historical_queries_adapter.h"
#include "ccf/http_query.h"
#include "ccf/json_handler.h"
#include "ccf/pal/locking.h"
#include "ccf/service/tables/nodes.h"
#include "ccf/service/tables/service.h"
#include "endpoints/grpc/grpc.h" // TODO(#22): private header
#include "endpoints/grpc/stream.h" // TODO(#22): private header
#include "etcd.pb.h"
#include "grpc.h"
#include "index.h"
Expand All @@ -20,6 +22,7 @@
#include "leases.h"
#include "lskvserver.pb.h"
#include "node_data.h"
#include "tls/msg_types.h" // TODO(#22): private header

#include <fmt/ranges.h>

Expand Down Expand Up @@ -255,6 +258,75 @@ namespace app
etcdserverpb::StatusRequest,
etcdserverpb::StatusResponse>(
etcdserverpb, maintenance, "Status", "/v3/maintenance/status", status);

auto watch =
[this](
ccf::endpoints::CommandEndpointContext& ctx,
etcdserverpb::WatchRequest&& payload,
ccf::grpc::StreamPtr<etcdserverpb::WatchResponse>&& out_stream) {
// TODO: For now, CCF accepts incomplete client stream payloads so
// simply ignore those for now
if (!payload.has_create_request())
{
return ccf::grpc::make_pending();
}

this->watch_impl(ctx, std::move(payload), std::move(out_stream));
return ccf::grpc::make_pending();
};
make_command_endpoint(
"/etcdserverpb.Watch/Watch",
HTTP_POST,
ccf::grpc_command_unary_stream_adapter<
etcdserverpb::WatchRequest,
etcdserverpb::WatchResponse>(watch),
{ccf::no_auth_required})
.install();
}

struct Watch
{
int64_t id;
ccf::grpc::DetachedStreamPtr<etcdserverpb::WatchResponse> stream;
};
int64_t next_watch_id = 0;

ccf::pal::Mutex watches_lock;
std::map<std::string, Watch> watches;

void watch_impl(
ccf::endpoints::CommandEndpointContext& ctx,
etcdserverpb::WatchRequest&& payload,
ccf::grpc::StreamPtr<etcdserverpb::WatchResponse>&& out_stream)
{
CCF_APP_INFO("Watch request received {}", payload.DebugString());

// TODO: Only support watch creation for now
if (payload.has_create_request())
{
std::lock_guard<ccf::pal::Mutex> guard(watches_lock);

auto const& create_payload = payload.create_request();
auto watch_id = next_watch_id++;
// notify the client of creation
{
etcdserverpb::WatchResponse response;
response.set_watch_id(watch_id);
response.set_created(true);
auto committed = last_committed_txid();
fill_header(*response.mutable_header(), committed);

out_stream->stream_msg(response);
}

Watch watch = {
watch_id,
ccf::grpc::detach_stream(ctx.rpc_ctx, std::move(out_stream))};
watches.emplace(std::make_pair(create_payload.key(), std::move(watch)));

CCF_APP_INFO(
"Created watch {} for key {}", watch_id, create_payload.key());
}
}

template <typename Out>
Expand Down Expand Up @@ -650,6 +722,24 @@ namespace app

SET_CUSTOM_CLAIMS(put)

std::lock_guard<ccf::pal::Mutex> guard(watches_lock);
auto w = watches.find(payload.key());
if (w != watches.end())
{
auto& watch = w->second;
etcdserverpb::WatchResponse response;
response.set_watch_id(watch.id);
auto* event = response.add_events();
event->set_type(etcdserverpb::Event::PUT);
auto* kv = event->mutable_kv();
kv->set_key(payload.key());
kv->set_value(payload.value());

auto committed = last_committed_txid();
fill_header(*response.mutable_header(), committed);
watch.stream->stream_msg(response);
}

return ccf::grpc::make_success(put_response);
}

Expand Down Expand Up @@ -1432,7 +1522,6 @@ namespace app
return out;
}
};

} // namespace app

namespace ccfapp
Expand Down