Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Start on eventing support
Browse files Browse the repository at this point in the history
  • Loading branch information
jroper committed Oct 16, 2020
1 parent 80bd552 commit ffa306b
Show file tree
Hide file tree
Showing 31 changed files with 1,969 additions and 390 deletions.
48 changes: 48 additions & 0 deletions protocols/example/shoppingcart/products.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2019 Lightbend Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

import "google/protobuf/empty.proto";
import "cloudstate/entity_key.proto";

package com.example.shoppingcart;

option go_package = "tck/shoppingcart";

message UpdateCartQuantityRequest {
string product_id = 1 [(.cloudstate.entity_key) = true];
string user_id = 2;
int32 quantity = 3;
}

message RemoveProductFromCartRequest {
string product_id = 1 [(.cloudstate.entity_key) = true];
string user_id = 2;
}

message GetProductRequest {
string product_id = 1 [(.cloudstate.entity_key) = true];
}

message Product {
int32 total_quantities = 1;
int32 total_carts = 2;
}

service ShoppingCartProducts {
rpc UpdateCartQuantity (UpdateCartQuantityRequest) returns (google.protobuf.Empty);
rpc RemoveProductFromCart (RemoveProductFromCartRequest) returns (google.protobuf.Empty);
rpc GetProduct (GetProductRequest) returns (Product);
}
43 changes: 43 additions & 0 deletions protocols/example/shoppingcart/projection.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2019 Lightbend Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is the public API offered by the shopping cart entity.
syntax = "proto3";

import "google/protobuf/empty.proto";
import "cloudstate/eventing.proto";
import "shoppingcart/persistence/domain.proto";

package com.example.shoppingcart;

option go_package = "tck/shoppingcart";

service ShoppingCartProjection {
rpc HandleItemAdded (com.example.shoppingcart.persistence.ItemAdded) returns (google.protobuf.Empty) {
option (.cloudstate.eventing) = {
in: {
event_log: "shopping-cart"
}
};
}

rpc HandleItemRemoved (com.example.shoppingcart.persistence.ItemRemoved) returns (google.protobuf.Empty) {
option (.cloudstate.eventing) = {
in: {
event_log: "shopping-cart"
}
};
}

}
1 change: 0 additions & 1 deletion protocols/example/shoppingcart/shoppingcart.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ service ShoppingCart {
post: "/cart/{user_id}/items/add",
body: "*",
};
option (.cloudstate.eventing).in = "items";
}

rpc RemoveItem(RemoveLineItem) returns (google.protobuf.Empty) {
Expand Down
2 changes: 1 addition & 1 deletion protocols/frontend/cloudstate/entity_key.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ option java_package = "io.cloudstate";
option go_package = "github.com/cloudstateio/go-support/cloudstate;cloudstate";

extend google.protobuf.FieldOptions {
bool entity_key = 50002;
bool entity_key = 1080;
}
46 changes: 43 additions & 3 deletions protocols/frontend/cloudstate/eventing.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,51 @@ option java_package = "io.cloudstate";
option java_multiple_files = true;
option java_outer_classname = "EventsProto";

// Eventing configuration for a gRPC method.
message Eventing {
string in = 1;
string out = 2; // Special value "discard" means do not publish
// The event source in configuration.
EventSource in = 1;

// The event destination out configuration.
//
// Optional, if unset, messages out will not be published anywhere.
EventDestination out = 2;
}

// Event source configuration
message EventSource {

// The consumer group id.
//
// By default, all rpc methods on a given service with the same source will be part of the same virtual consumer
// group, messages will be routed to the different methods by type. This can be used to override that, if you want
// multiple methods to act as independent consumers of the same source (ie, if you want the same event to be
// published to each consumer) then give each consumer a unique name.
//
// Note that this does depend on the event source supporting multiple consumer groups. Queue based event sources
// may not support this.
string consumer_group = 1;

oneof source {

// A topic source.
//
// This will consume events from the given topic name.
string topic = 2;

// An event log source.
//
// This will consume events from the given event log with the given persistence id.
string event_log = 3;
}
}

message EventDestination {
oneof destination {
string topic = 1;
}
}

extend google.protobuf.MethodOptions {
Eventing eventing = 50003;
Eventing eventing = 1081;
}
30 changes: 30 additions & 0 deletions protocols/frontend/cloudstate/legacy_entity_key.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2019 Lightbend Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Extension for specifying which field in a message is to be considered an
// entity key, for the purposes associating gRPC calls with entities and
// sharding.

syntax = "proto3";

import "google/protobuf/descriptor.proto";

package cloudstate;

option java_package = "io.cloudstate";
option go_package = "github.com/cloudstateio/go-support/cloudstate/;cloudstate";

extend google.protobuf.FieldOptions {
bool legacy_entity_key = 50002;
}
11 changes: 8 additions & 3 deletions protocols/protocol/cloudstate/entity.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ message MetadataEntry {

// A reply to the sender.
message Reply {

// The reply payload
google.protobuf.Any payload = 1;

Expand All @@ -103,21 +104,25 @@ message Reply {
// Not all transports support per message metadata, for example, gRPC doesn't. The Cloudstate proxy MAY ignore the
// metadata in this case, or it MAY lift the metadata into another place, for example, in gRPC, a unary call MAY
// have its reply metadata placed in the headers of the HTTP response, or the first reply to a streamed call MAY
// have its metadata placed in the headers of the HTTP response.
// if its metadata placed in the headers of the HTTP response.
//
// If the metadata is ignored, the Cloudstate proxy MAY notify the user function by sending an error message to the
// EntityDiscovery.ReportError gRPC call.
Metadata metadata = 2;
cloudstate.Metadata metadata = 2;
}

// Forwards handling of this request to another entity.
message Forward {

// The name of the service to forward to.
string service_name = 1;

// The name of the command.
string command_name = 2;

// The payload.
google.protobuf.Any payload = 3;

// The metadata to include with the forward
Metadata metadata = 4;
}
Expand Down Expand Up @@ -172,7 +177,7 @@ message Command {
// The command payload.
google.protobuf.Any payload = 4;

// Whether the command is streamed or not
// Whether the command is streamed or not.
bool streamed = 5;

// The command metadata.
Expand Down
17 changes: 15 additions & 2 deletions proxy/core/src/main/resources/in-memory.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,26 @@ include "cloudstate-common"

akka.persistence {

journal.plugin = "akka.persistence.journal.inmem"
journal.plugin = inmem-journal
snapshot-store.plugin = inmem-snapshot-store


}

inmem-journal {
class = "akka.persistence.cloudstate.InmemJournal"
}

inmem-snapshot-store {
class = "io.cloudstate.proxy.eventsourced.InMemSnapshotStore"
}

cloudstate.proxy.journal-enabled = true
inmem-read-journal {
class = "akka.persistence.cloudstate.InmemReadJournal"
}

cloudstate.proxy.journal {
enabled = true
read-journal = inmem-read-journal
offset-store = "io.cloudstate.proxy.eventing.InMemoryOffsetTracking"
}
12 changes: 10 additions & 2 deletions proxy/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,16 @@ cloudstate.proxy {
random-factor = 0.2
}

# If using a config that enables it, then set to true
journal-enabled = false
journal {
# If using a config that enables it, then set to true
enabled = false

# The id of the read journal plugin
read-journal = ""

# The FQCN of the offset store
offset-store = ""
}

telemetry {
# Whether telemetry (instrumentation and prometheus exporter) should be disabled
Expand Down
Loading

0 comments on commit ffa306b

Please sign in to comment.