-
Notifications
You must be signed in to change notification settings - Fork 97
Conversation
|
||
import scala.collection.mutable | ||
|
||
object MergeSequence { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't this already get added to akka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it did, but my initial aim was just to get the branch rebased and compiling, which is what this is.
👍 Sounds good to switch over to Akka projections. |
fd3f698
to
1ac498c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. There's a lot there, with some complexity in places. Examples and tests are looking good 👍
proxy/core/src/main/scala/io/cloudstate/proxy/eventing/EventingManager.scala
Show resolved
Hide resolved
proxy/core/src/main/scala/io/cloudstate/proxy/eventing/GooglePubsubEventing.scala
Show resolved
Hide resolved
proxy/core/src/main/scala/io/cloudstate/proxy/eventing/GooglePubsubEventing.scala
Outdated
Show resolved
Hide resolved
def eventsByTagQuery(tag: String, fromOffset: Long): Source[EventEnvelope, NotUsed] = | ||
// Technically, there's a race condition that could result in messages being dropped here, if a new message is | ||
// persisted after this returns, but before the subscriber to the query materializes this stream, that message | ||
// will be dropped. But this is only the in memory journal which makes no sense to use in production. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough that it's likely not needed. If we did want to close this gap, I guess it could retrieve the old messages more as a lazy source once materialized, or something similar.
proxy/core/src/main/scala/io/cloudstate/proxy/eventsourced/InMemJournal.scala
Show resolved
Hide resolved
Failing in the TCK. Unexpected connection type. We could have these go through different probes, so the tests don't affect each other, but it's also useful to know that something wasn't expected. |
The TCK failure is because I'd created an event sourced subscriber in the JS shopping cart example. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Nice work!
import scala.concurrent.Future | ||
import scala.util.{Failure, Success} | ||
|
||
final class InterceptActionService(context: InterceptorContext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good, was just about to add the intercept for actions.
nice work! |
// - The `Effect` method receives an `EffectRequest` message and must respond with a `Response` that contains the id | ||
// from the effect message. | ||
// - The `ProcessAnyEvent` method receives a `google.protobuf.Any`, which will contain JSON serialized according to | ||
// the Cloudstate JSON serialization conventions, with a `type_url` of `json.cloudstate.io/JsonEvent`. The contents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the mandated value for the type (not the prefix) of json.cloudstate.io/JsonEvent
here can't be enforced as different languages choose "a path determined using a language-specific mechanism" for the type serialized as per https://cloudstate.io/docs/contribute/serialization.html#json-values.
Although the TCK does not enforce the type_url
described here:
b722f86#diff-e685c4ba3ccc54f02fab8a427f5f505c63fc8edd56b92dab716e934ccacb2b9aR3295
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The suffix can be anything - but the proxy handles JSON in a particular way, so the TCK does indirectly enforce it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The suffix can be anything
Then this:
with a
type_url
ofjson.cloudstate.io/JsonEvent
is inaccurate I think. The suffix is out of reach to be defined by the TCK model implemented. Instead it's what the language support has decided to be for a type of a languages types for the Cloudstate JSON serialisation implementation.
// | ||
// - Reply: reply with the given message in a `Response`. | ||
// - Forward: forward to another service, in place of replying with a `Response`. | ||
// - SideEffect: add a side effect to the current reply, forward, or failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SideEffect is missing yet in ProcessStep
.
// - Contain a single `message` property with the value of the `message` field in `JsonEvent`. | ||
// - Be serialized according to the Cloudstate JSON serialization conventions - that is, with the JSON serialized to | ||
// bytes, then placed into a protobuf message with a single bytes field with field number 1. | ||
// - Have a type_url of `json.cloudstate.io/JsonEvent`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Java Support implementation does return here a JsonMessage
, instead of a JsonEvent
as an Event.
Could we write that the type_url has to have a prefix of ``json.cloudstate.io/`?
This is the old pull request #353, rebased, and got to the point of compilation. Haven't tried running yet. My next major task is to switch to using Akka projections.