diff --git a/CMakeLists.txt b/CMakeLists.txt index 77089a693..f80bffffd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,10 +10,11 @@ project( LANGUAGES C CXX VERSION ${LSKV_VERSION_SHORT}) -option( - COMPILE_TARGET - "Compile target to build for, one of [virtual;sgx;snp], defaults to virtual" - virtual) +set(COMPILE_TARGET + "virtual" + CACHE STRING + "Compile target to build for, one of [virtual;sgx;snp], defaults to virtual" +) set(CCF "ccf_${COMPILE_TARGET}") @@ -42,6 +43,12 @@ option(VERBOSE_LOGGING "enable verbose logging" OFF) add_compile_definitions(LSKV_VERSION="${LSKV_VERSION}") +# disable log deprecation warnings, we're using old versions of things that +# might not be fully compatible +add_compile_definitions(CCF_LOGGER_NO_DEPRECATE) +# Not sure why this is needed but it gets the protobuf stuff to build +add_compile_definitions(GOOGLE_PROTOBUF_INTERNAL_DONATE_STEAL_INLINE=0) + add_ccf_app( lskv SRCS diff --git a/ccf_watches.patch b/ccf_watches.patch new file mode 100644 index 000000000..600cb3854 --- /dev/null +++ b/ccf_watches.patch @@ -0,0 +1,101 @@ +diff --git a/src/endpoints/grpc/grpc.h b/src/endpoints/grpc/grpc.h +index bf7829feb..c29fa47ad 100644 +--- a/src/endpoints/grpc/grpc.h ++++ b/src/endpoints/grpc/grpc.h +@@ -68,22 +68,30 @@ namespace ccf::grpc + } + else + { +- const auto message_length = impl::read_message_frame(data, size); +- if (size != message_length) ++ In in; ++ try + { +- throw std::logic_error(fmt::format( +- "Error in gRPC frame: frame size is {} but messages is {} bytes", +- size, +- message_length)); +- } ++ const auto message_length = impl::read_message_frame(data, size); ++ if (size != message_length) ++ { ++ throw std::logic_error(fmt::format( ++ "Error in gRPC frame: frame size is {} but messages is {} bytes", ++ size, ++ message_length)); ++ } + +- In in; +- if (!in.ParseFromArray(data, message_length)) ++ if (!in.ParseFromArray(data, message_length)) ++ { ++ throw std::logic_error(fmt::format( ++ "Error deserialising protobuf payload of type {}, size {}", ++ in.GetTypeName(), ++ size)); ++ } ++ } ++ catch (const std::exception& e) + { +- throw std::logic_error(fmt::format( +- "Error deserialising protobuf payload of type {}, size {}", +- in.GetTypeName(), +- size)); ++ // Note: Client streaming! ++ LOG_FAIL_FMT("Error deserialising payload: {}", e.what()); + } + return in; + } +diff --git a/src/http/http2_callbacks.h b/src/http/http2_callbacks.h +index 45634231c..d7d46ca44 100644 +--- a/src/http/http2_callbacks.h ++++ b/src/http/http2_callbacks.h +@@ -36,7 +36,7 @@ namespace http2 + stream_data->outgoing.state == StreamResponseState::Streaming) + { + // Early out: when streaming, avoid calling this callback +- // repeatedly when there no data to read ++ // repeatedly when there is no data to read + return NGHTTP2_ERR_DEFERRED; + } + +@@ -122,8 +122,20 @@ namespace http2 + return 0; + } + ++ auto& headers = stream_data->incoming.headers; ++ std::string url = {}; ++ { ++ const auto url_it = headers.find(http2::headers::PATH); ++ if (url_it != headers.end()) ++ { ++ url = url_it->second; ++ } ++ } ++ + // If the request is complete, process it +- if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) ++ if ( ++ frame->hd.flags & NGHTTP2_FLAG_END_STREAM || ++ url == "/etcdserverpb.Watch/Watch") + { + auto* p = get_parser(user_data); + p->handle_completed(stream_id, stream_data); +diff --git a/src/http/http2_parser.h b/src/http/http2_parser.h +index 4e5b91a33..a4b808b8a 100644 +--- a/src/http/http2_parser.h ++++ b/src/http/http2_parser.h +@@ -387,8 +387,11 @@ namespace http2 + + if (stream_data->outgoing.state != StreamResponseState::Uninitialised) + { +- throw std::logic_error(fmt::format( +- "Stream {} should be uninitialised to start stream", stream_id)); ++ // throw std::logic_error(fmt::format( ++ // "Stream {} should be uninitialised to start stream", stream_id)); ++ ++ stream_data->outgoing.state = StreamResponseState::Streaming; ++ return; + } + + stream_data->outgoing.state = StreamResponseState::Streaming; diff --git a/proto/etcd.proto b/proto/etcd.proto index 34491d512..c1ad994db 100644 --- a/proto/etcd.proto +++ b/proto/etcd.proto @@ -29,6 +29,27 @@ message KeyValue int64 lease = 6; } +message Event { + enum EventType { + PUT = 0; + DELETE = 1; + } + // type is the kind of event. If type is a PUT, it indicates + // new data has been stored to the key. If type is a DELETE, + // it indicates the key was deleted. + EventType type = 1; + // kv holds the KeyValue for the event. + // A PUT event contains current kv pair. + // A PUT event with kv.Version=1 indicates the creation of a key. + // A DELETE/EXPIRE event contains the deleted key with + // its modification revision set to the revision of deletion. + KeyValue kv = 2; + + // prev_kv holds the key-value pair before the event happens. + KeyValue prev_kv = 3; +} + + message ResponseHeader { // option (versionpb.etcd_version_msg) = "3.0"; @@ -477,3 +498,110 @@ message StatusResponse // delay in relationship to the target cluster version. string storageVersion = 11; } + +message WatchRequest { + // option (versionpb.etcd_version_msg) = "3.0"; + // request_union is a request to either create a new watcher or cancel an existing watcher. + oneof request_union { + WatchCreateRequest create_request = 1; + WatchCancelRequest cancel_request = 2; + WatchProgressRequest progress_request = 3 ;//[(versionpb.etcd_version_field)="3.4"]; + } +} + +message WatchCreateRequest { + // option (versionpb.etcd_version_msg) = "3.0"; + + // key is the key to register for watching. + bytes key = 1; + + // range_end is the end of the range [key, range_end) to watch. If range_end is not given, + // only the key argument is watched. If range_end is equal to '\0', all keys greater than + // or equal to the key argument are watched. + // If the range_end is one bit larger than the given key, + // then all keys with the prefix (the given key) will be watched. + bytes range_end = 2; + + // start_revision is an optional revision to watch from (inclusive). No start_revision is "now". + int64 start_revision = 3; + + // progress_notify is set so that the etcd server will periodically send a WatchResponse with + // no events to the new watcher if there are no recent events. It is useful when clients + // wish to recover a disconnected watcher starting from a recent known revision. + // The etcd server may decide how often it will send notifications based on current load. + bool progress_notify = 4; + + enum FilterType { + // option (versionpb.etcd_version_enum) = "3.1"; + + // filter out put event. + NOPUT = 0; + // filter out delete event. + NODELETE = 1; + } + + // filters filter the events at server side before it sends back to the watcher. + repeated FilterType filters = 5 ;//[(versionpb.etcd_version_field)="3.1"]; + + // If prev_kv is set, created watcher gets the previous KV before the event happens. + // If the previous KV is already compacted, nothing will be returned. + bool prev_kv = 6 ;//[(versionpb.etcd_version_field)="3.1"]; + + // If watch_id is provided and non-zero, it will be assigned to this watcher. + // Since creating a watcher in etcd is not a synchronous operation, + // this can be used ensure that ordering is correct when creating multiple + // watchers on the same stream. Creating a watcher with an ID already in + // use on the stream will cause an error to be returned. + int64 watch_id = 7 ;//[(versionpb.etcd_version_field)="3.4"]; + + // fragment enables splitting large revisions into multiple watch responses. + bool fragment = 8 ;//[(versionpb.etcd_version_field)="3.4"]; +} + +message WatchCancelRequest { + // option (versionpb.etcd_version_msg) = "3.1"; + // watch_id is the watcher id to cancel so that no more events are transmitted. + int64 watch_id = 1 ;//[(versionpb.etcd_version_field)="3.1"]; +} + +// Requests the a watch stream progress status be sent in the watch response stream as soon as +// possible. +message WatchProgressRequest { + // option (versionpb.etcd_version_msg) = "3.4"; +} + +message WatchResponse { + // option (versionpb.etcd_version_msg) = "3.0"; + + ResponseHeader header = 1; + // watch_id is the ID of the watcher that corresponds to the response. + int64 watch_id = 2; + + // created is set to true if the response is for a create watch request. + // The client should record the watch_id and expect to receive events for + // the created watcher from the same stream. + // All events sent to the created watcher will attach with the same watch_id. + bool created = 3; + + // canceled is set to true if the response is for a cancel watch request. + // No further events will be sent to the canceled watcher. + bool canceled = 4; + + // compact_revision is set to the minimum index if a watcher tries to watch + // at a compacted index. + // + // This happens when creating a watcher at a compacted revision or the watcher cannot + // catch up with the progress of the key-value store. + // + // The client should treat the watcher as canceled and should not try to create any + // watcher with the same start_revision again. + int64 compact_revision = 5; + + // cancel_reason indicates the reason for canceling the watcher. + string cancel_reason = 6;// [(versionpb.etcd_version_field)="3.4"]; + + // framgment is true if large watch response was split over multiple responses. + bool fragment = 7 ;//[(versionpb.etcd_version_field)="3.4"]; + + repeated Event events = 11; +} \ No newline at end of file diff --git a/src/app/app.cpp b/src/app/app.cpp index a02236cba..cb2dea420 100644 --- a/src/app/app.cpp +++ b/src/app/app.cpp @@ -9,9 +9,11 @@ #include "ccf/historical_queries_adapter.h" #include "ccf/http_query.h" #include "ccf/json_handler.h" +#include "ccf/pal/locking.h" #include "ccf/service/tables/nodes.h" #include "ccf/service/tables/service.h" #include "endpoints/grpc/grpc.h" // TODO(#22): private header +#include "endpoints/grpc/stream.h" // TODO(#22): private header #include "etcd.pb.h" #include "grpc.h" #include "index.h" @@ -20,6 +22,7 @@ #include "leases.h" #include "lskvserver.pb.h" #include "node_data.h" +#include "tls/msg_types.h" // TODO(#22): private header #include @@ -255,6 +258,75 @@ namespace app etcdserverpb::StatusRequest, etcdserverpb::StatusResponse>( etcdserverpb, maintenance, "Status", "/v3/maintenance/status", status); + + auto watch = + [this]( + ccf::endpoints::CommandEndpointContext& ctx, + etcdserverpb::WatchRequest&& payload, + ccf::grpc::StreamPtr&& out_stream) { + // TODO: For now, CCF accepts incomplete client stream payloads so + // simply ignore those for now + if (!payload.has_create_request()) + { + return ccf::grpc::make_pending(); + } + + this->watch_impl(ctx, std::move(payload), std::move(out_stream)); + return ccf::grpc::make_pending(); + }; + make_command_endpoint( + "/etcdserverpb.Watch/Watch", + HTTP_POST, + ccf::grpc_command_unary_stream_adapter< + etcdserverpb::WatchRequest, + etcdserverpb::WatchResponse>(watch), + {ccf::no_auth_required}) + .install(); + } + + struct Watch + { + int64_t id; + ccf::grpc::DetachedStreamPtr stream; + }; + int64_t next_watch_id = 0; + + ccf::pal::Mutex watches_lock; + std::map watches; + + void watch_impl( + ccf::endpoints::CommandEndpointContext& ctx, + etcdserverpb::WatchRequest&& payload, + ccf::grpc::StreamPtr&& out_stream) + { + CCF_APP_INFO("Watch request received {}", payload.DebugString()); + + // TODO: Only support watch creation for now + if (payload.has_create_request()) + { + std::lock_guard guard(watches_lock); + + auto const& create_payload = payload.create_request(); + auto watch_id = next_watch_id++; + // notify the client of creation + { + etcdserverpb::WatchResponse response; + response.set_watch_id(watch_id); + response.set_created(true); + auto committed = last_committed_txid(); + fill_header(*response.mutable_header(), committed); + + out_stream->stream_msg(response); + } + + Watch watch = { + watch_id, + ccf::grpc::detach_stream(ctx.rpc_ctx, std::move(out_stream))}; + watches.emplace(std::make_pair(create_payload.key(), std::move(watch))); + + CCF_APP_INFO( + "Created watch {} for key {}", watch_id, create_payload.key()); + } } template @@ -650,6 +722,24 @@ namespace app SET_CUSTOM_CLAIMS(put) + std::lock_guard guard(watches_lock); + auto w = watches.find(payload.key()); + if (w != watches.end()) + { + auto& watch = w->second; + etcdserverpb::WatchResponse response; + response.set_watch_id(watch.id); + auto* event = response.add_events(); + event->set_type(etcdserverpb::Event::PUT); + auto* kv = event->mutable_kv(); + kv->set_key(payload.key()); + kv->set_value(payload.value()); + + auto committed = last_committed_txid(); + fill_header(*response.mutable_header(), committed); + watch.stream->stream_msg(response); + } + return ccf::grpc::make_success(put_response); } @@ -1432,7 +1522,6 @@ namespace app return out; } }; - } // namespace app namespace ccfapp