Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Start on eventing support #465

Merged
merged 1 commit into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ val AkkaManagementVersion = "1.0.8"
val AkkaPersistenceCassandraVersion = "0.102"
val AkkaPersistenceJdbcVersion = "3.5.2"
val AkkaPersistenceSpannerVersion = "1.0.0-RC4"
val AkkaProjectionsVersion = "1.0.0"
val PrometheusClientVersion = "0.9.0"
val ScalaTestVersion = "3.0.8"
val ProtobufVersion = "3.11.4" // Note: sync with Protobuf version in Akka gRPC and ScalaPB
Expand Down Expand Up @@ -77,6 +78,9 @@ def akkaDiscoveryDependency(name: String, excludeThese: ExclusionRule*) =
def akkaPersistenceCassandraDependency(name: String, excludeThese: ExclusionRule*) =
"com.typesafe.akka" %% name % AkkaPersistenceCassandraVersion excludeAll ((excludeTheseDependencies ++ excludeThese): _*)

def akkaProjectionsDependency(name: String, excludeThese: ExclusionRule*) =
"com.lightbend.akka" %% name % AkkaProjectionsVersion excludeAll ((excludeTheseDependencies ++ excludeThese): _*)

def common: Seq[Setting[_]] = automateHeaderSettings(Compile, Test) ++ Seq(
headerMappings := headerMappings.value ++ Seq(
de.heikoseeberger.sbtheader.FileType("proto") -> HeaderCommentStyle.cppStyleLineComment,
Expand Down Expand Up @@ -388,13 +392,17 @@ lazy val `proxy-core` = (project in file("proxy/core"))
akkaDependency("akka-stream"),
akkaDependency("akka-slf4j"),
akkaDependency("akka-discovery"),
akkaDependency("akka-cluster-typed"),
akkaHttpDependency("akka-http"),
akkaHttpDependency("akka-http-spray-json"),
akkaHttpDependency("akka-http-core"),
akkaHttpDependency("akka-http2-support"),
akkaDependency("akka-cluster-sharding", ExclusionRule("org.lmdbjava", "lmdbjava")),
akkaManagementDependency("akka-management-cluster-bootstrap"),
akkaDiscoveryDependency("akka-discovery-kubernetes-api"),
akkaProjectionsDependency("akka-projection-core"),
akkaProjectionsDependency("akka-projection-eventsourced"),
akkaProjectionsDependency("akka-projection-testkit"), // Needed for in memory support
"com.google.protobuf" % "protobuf-java" % ProtobufVersion % "protobuf",
"com.google.protobuf" % "protobuf-java-util" % ProtobufVersion,
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test,
Expand Down Expand Up @@ -458,7 +466,8 @@ lazy val `proxy-cassandra` = (project in file("proxy/cassandra"))
dependencyOverrides += "io.grpc" % "grpc-netty-shaded" % GrpcNettyShadedVersion,
libraryDependencies ++= Seq(
akkaPersistenceCassandraDependency("akka-persistence-cassandra", ExclusionRule("com.github.jnr")),
akkaPersistenceCassandraDependency("akka-persistence-cassandra-launcher") % Test
akkaPersistenceCassandraDependency("akka-persistence-cassandra-launcher") % Test,
"com.lightbend.akka" %% "akka-projection-cassandra" % AkkaProjectionsVersion
),
fork in run := true,
mainClass in Compile := Some("io.cloudstate.proxy.CloudStateProxyMain"),
Expand All @@ -479,6 +488,7 @@ lazy val `proxy-jdbc` = (project in file("proxy/jdbc"))
dependencyOverrides += "io.grpc" % "grpc-netty-shaded" % GrpcNettyShadedVersion,
libraryDependencies ++= Seq(
"com.github.dnvriend" %% "akka-persistence-jdbc" % AkkaPersistenceJdbcVersion,
"com.lightbend.akka" %% "akka-projection-slick" % AkkaProjectionsVersion,
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test
),
fork in run := true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,6 @@

import com.oracle.svm.core.annotate.*;

@TargetClass(
className = "io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess",
onlyWith = Existence.class)
final class Target_io_netty_util_internal_shaded_org_jctools_util_UnsafeRefArrayAccess {
@Alias
@RecomputeFieldValue(kind = RecomputeFieldValue.Kind.ArrayIndexShift, declClass = Object[].class)
public static int REF_ELEMENT_SHIFT;
}

@TargetClass(className = "io.netty.util.internal.CleanerJava6", onlyWith = Existence.class)
final class Target_io_netty_util_internal_CleanerJava6 {
@Alias
@RecomputeFieldValue(
kind = RecomputeFieldValue.Kind.FieldOffset,
declClassName = "java.nio.DirectByteBuffer",
name = "cleaner")
private static long CLEANER_FIELD_OFFSET;
}

@TargetClass(className = "io.netty.util.internal.PlatformDependent", onlyWith = Existence.class)
final class Target_io_netty_util_internal_PlatformDependent {
@Alias
@RecomputeFieldValue(kind = RecomputeFieldValue.Kind.ArrayBaseOffset, declClass = byte[].class)
private static long ARRAY_BASE_OFFSET;
}

@TargetClass(className = "io.netty.util.internal.PlatformDependent0", onlyWith = Existence.class)
final class Target_io_netty_util_internal_PlatformDependent0 {
@Alias
@RecomputeFieldValue(
kind = RecomputeFieldValue.Kind.FieldOffset,
declClassName = "java.nio.Buffer",
name = "address")
private static long ADDRESS_FIELD_OFFSET;
}

@TargetClass(className = "org.agrona.concurrent.AbstractConcurrentArrayQueue")
final class Target_org_agrona_concurrent_AbstractConcurrentArrayQueue {
@Alias
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ object AnySupport {
.asInstanceOf[Seq[(String, Primitive[Any])]]
.toMap

private final val objectMapper = new ObjectMapper()
final val objectMapper = new ObjectMapper()

private def primitiveToBytes[T](primitive: Primitive[T], value: T): ByteString =
if (value != primitive.defaultValue) {
Expand Down Expand Up @@ -178,6 +178,8 @@ object AnySupport {
descriptor.getDependencies.asScala.toSeq ++ descriptor.getPublicDependencies.asScala)
}
}

def extractBytes(bytes: ByteString): ByteString = bytesToPrimitive(BytesPrimitive, bytes)
}

class AnySupport(descriptors: Array[Descriptors.FileDescriptor],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ import java.lang.annotation.Annotation
import java.lang.reflect.{AccessibleObject, Executable, Member, Method, ParameterizedType, Type, WildcardType}
import java.util.Optional

import akka.NotUsed
import io.cloudstate.javasupport.{
CloudEvent,
Context,
EntityContext,
EntityId,
Jsonable,
Metadata,
MetadataContext,
ServiceCallFactory
}
import com.google.protobuf.{Any => JavaPbAny}

import scala.reflect.ClassTag
import scala.runtime.BoxedUnit

/**
* How we do reflection:
Expand Down Expand Up @@ -208,13 +211,12 @@ private[impl] object ReflectionHelper {

verifyAtMostOneMainArgument("CommandHandler", method, parameters)

parameters.foreach {
case MainArgumentParameterHandler(inClass) if !inClass.isAssignableFrom(serviceMethod.inputType.typeClass) =>
throw new RuntimeException(
s"Incompatible command class $inClass for command $name, expected ${serviceMethod.inputType.typeClass}"
)
case _ =>
}
val mainArgumentDecoder: JavaPbAny => AnyRef = parameters
.collectFirst {
case MainArgumentParameterHandler(inClass) =>
getMainArgumentDecoder(name, inClass, serviceMethod.inputType)
}
.getOrElse(_ => NotUsed)

private def serialize(result: AnyRef) =
JavaPbAny
Expand Down Expand Up @@ -249,13 +251,31 @@ private[impl] object ReflectionHelper {
}

def invoke(obj: AnyRef, command: JavaPbAny, context: CommandContext): Optional[JavaPbAny] = {
val decodedCommand = serviceMethod.inputType.parseFrom(command.getValue).asInstanceOf[AnyRef]
val decodedCommand = mainArgumentDecoder(command)
val ctx = InvocationContext(decodedCommand, context)
val result = method.invoke(obj, parameters.map(_.apply(ctx)): _*)
handleResult(result)
}
}

def getMainArgumentDecoder(name: String, actualType: Class[_], pbType: ResolvedType[_]): JavaPbAny => AnyRef =
if (actualType.isAssignableFrom(pbType.typeClass)) { pbAny =>
pbType.parseFrom(pbAny.getValue).asInstanceOf[AnyRef]
} else if (pbType.typeClass.equals(classOf[JavaPbAny]) && actualType.getAnnotation(classOf[Jsonable]) != null) {
val reader = AnySupport.objectMapper.readerFor(actualType)
pbAny => {
if (pbAny.getTypeUrl.startsWith(AnySupport.CloudStateJson)) {
reader.readValue(AnySupport.extractBytes(pbAny.getValue).newInput()).asInstanceOf[AnyRef]
} else {
throw new RuntimeException(
s"Don't know how to deserialize protobuf Any type with type URL ${pbAny.getTypeUrl} "
)
}
}
} else {
throw new RuntimeException(s"Incompatible input class $actualType for call $name, expected ${pbType.typeClass}")
}

def getRawType(t: Type): Class[_] = t match {
case clazz: Class[_] => clazz
case pt: ParameterizedType => getRawType(pt.getRawType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.{CompletableFuture, CompletionStage}

import akka.NotUsed
import akka.stream.{javadsl, Materializer}
import akka.stream.javadsl.{AsPublisher, Source}
import akka.stream.javadsl.Source
import akka.stream.scaladsl.{JavaFlowSupport, Sink}
import com.google.protobuf.{Descriptors, Any => JavaPbAny}
import io.cloudstate.javasupport.action._
Expand All @@ -33,7 +33,6 @@ import io.cloudstate.javasupport.impl.{
ResolvedServiceMethod,
ResolvedType
}
import io.cloudstate.javasupport.Metadata

/**
* Annotation based implementation of the [[ActionHandler]].
Expand Down Expand Up @@ -247,23 +246,19 @@ private object ActionReflection {
ReflectionHelper.getRawType(parameterType) match {
case envelope if envelope == classOf[MessageEnvelope[_]] =>
val messageType = ReflectionHelper.getFirstParameter(parameterType)
if (messageType != resolvedType.typeClass) {
throw new RuntimeException(
s"Incompatible message class $messageType for call $method, expected ${resolvedType.typeClass}"
)
} else { envelope =>
val decoder = ReflectionHelper.getMainArgumentDecoder(method, messageType, resolvedType)

{ envelope =>
MessageEnvelope.of(
resolvedType.parseFrom(envelope.payload.getValue).asInstanceOf[AnyRef],
decoder(envelope.payload),
envelope.metadata
)
}
case payload =>
if (payload != resolvedType.typeClass) {
throw new RuntimeException(
s"Incompatible message class $payload for call $method, expected ${resolvedType.typeClass}"
)
} else { envelope =>
resolvedType.parseFrom(envelope.payload.getValue).asInstanceOf[AnyRef]
val decoder = ReflectionHelper.getMainArgumentDecoder(method, payload, resolvedType)

{ envelope =>
decoder(envelope.payload)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.example.valueentity.shoppingcart.Shoppingcart;
import io.cloudstate.javasupport.CloudState;
import io.cloudstate.javasupport.tck.model.eventlogeventing.EventLogSubscriber;
import io.cloudstate.javasupport.tck.model.valuebased.ValueEntityTckModelEntity;
import io.cloudstate.javasupport.tck.model.valuebased.ValueEntityTwoEntity;
import io.cloudstate.javasupport.tck.model.action.ActionTckModelBehavior;
Expand All @@ -29,6 +30,7 @@
import io.cloudstate.samples.shoppingcart.ShoppingCartEntity;
import io.cloudstate.tck.model.Action;
import io.cloudstate.tck.model.Crdt;
import io.cloudstate.tck.model.Eventlogeventing;
import io.cloudstate.tck.model.Eventsourced;
import io.cloudstate.tck.model.valueentity.Valueentity;

Expand Down Expand Up @@ -66,6 +68,17 @@ public static final void main(String[] args) throws Exception {
.registerEventSourcedEntity(
EventSourcedTwoEntity.class,
Eventsourced.getDescriptor().findServiceByName("EventSourcedTwo"))
.registerAction(
new EventLogSubscriber(),
Eventlogeventing.getDescriptor().findServiceByName("EventLogSubscriberModel"))
.registerEventSourcedEntity(
io.cloudstate.javasupport.tck.model.eventlogeventing.EventSourcedEntityOne.class,
Eventlogeventing.getDescriptor().findServiceByName("EventSourcedEntityOne"),
Eventlogeventing.getDescriptor())
.registerEventSourcedEntity(
io.cloudstate.javasupport.tck.model.eventlogeventing.EventSourcedEntityTwo.class,
Eventlogeventing.getDescriptor().findServiceByName("EventSourcedEntityTwo"),
Eventlogeventing.getDescriptor())
.registerEventSourcedEntity(
io.cloudstate.samples.eventsourced.shoppingcart.ShoppingCartEntity.class,
com.example.shoppingcart.Shoppingcart.getDescriptor().findServiceByName("ShoppingCart"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2019 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cloudstate.javasupport.tck.model.eventlogeventing;

import akka.NotUsed;
import akka.stream.javadsl.Source;
import io.cloudstate.javasupport.CloudEvent;
import io.cloudstate.javasupport.action.Action;
import io.cloudstate.javasupport.action.ActionContext;
import io.cloudstate.javasupport.action.ActionReply;
import io.cloudstate.javasupport.action.CallHandler;
import io.cloudstate.tck.model.EventLogSubscriberModel;
import io.cloudstate.tck.model.Eventlogeventing;

@Action
public class EventLogSubscriber {

@CallHandler
public ActionReply<Eventlogeventing.Response> processEventOne(
ActionContext context, CloudEvent cloudEvent, Eventlogeventing.EventOne eventOne) {
return convert(context, cloudEvent, eventOne.getStep());
}

@CallHandler
public Source<ActionReply<Eventlogeventing.Response>, NotUsed> processEventTwo(
ActionContext context, CloudEvent cloudEvent, Eventlogeventing.EventTwo eventTwo) {
return Source.from(eventTwo.getStepList()).map(step -> convert(context, cloudEvent, step));
}

@CallHandler
public Eventlogeventing.Response effect(Eventlogeventing.EffectRequest request) {
return Eventlogeventing.Response.newBuilder()
.setId(request.getId())
.setMessage(request.getMessage())
.build();
}

@CallHandler
public Eventlogeventing.Response processAnyEvent(JsonMessage jsonMessage, CloudEvent cloudEvent) {
return Eventlogeventing.Response.newBuilder()
.setId(cloudEvent.subject().orElse(""))
.setMessage(jsonMessage.message)
.build();
}

private ActionReply<Eventlogeventing.Response> convert(
ActionContext context, CloudEvent cloudEvent, Eventlogeventing.ProcessStep step) {
String id = cloudEvent.subject().orElse("");
if (step.hasReply()) {
return ActionReply.message(
Eventlogeventing.Response.newBuilder()
.setId(id)
.setMessage(step.getReply().getMessage())
.build());
} else if (step.hasForward()) {
return ActionReply.forward(
context
.serviceCallFactory()
.lookup(EventLogSubscriberModel.name, "Effect", Eventlogeventing.EffectRequest.class)
.createCall(
Eventlogeventing.EffectRequest.newBuilder()
.setId(id)
.setMessage(step.getForward().getMessage())
.build()));
} else {
throw new RuntimeException("No reply or forward");
}
}
}
Loading