Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Eventing support
Browse files Browse the repository at this point in the history
  • Loading branch information
jroper committed Dec 2, 2020
1 parent ab459ef commit 1ac498c
Show file tree
Hide file tree
Showing 59 changed files with 2,708 additions and 461 deletions.
12 changes: 11 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ val AkkaManagementVersion = "1.0.8"
val AkkaPersistenceCassandraVersion = "0.102"
val AkkaPersistenceJdbcVersion = "3.5.2"
val AkkaPersistenceSpannerVersion = "1.0.0-RC4"
val AkkaProjectionsVersion = "1.0.0"
val PrometheusClientVersion = "0.9.0"
val ScalaTestVersion = "3.0.8"
val ProtobufVersion = "3.11.4" // Note: sync with Protobuf version in Akka gRPC and ScalaPB
Expand Down Expand Up @@ -77,6 +78,9 @@ def akkaDiscoveryDependency(name: String, excludeThese: ExclusionRule*) =
def akkaPersistenceCassandraDependency(name: String, excludeThese: ExclusionRule*) =
"com.typesafe.akka" %% name % AkkaPersistenceCassandraVersion excludeAll ((excludeTheseDependencies ++ excludeThese): _*)

def akkaProjectionsDependency(name: String, excludeThese: ExclusionRule*) =
"com.lightbend.akka" %% name % AkkaProjectionsVersion excludeAll ((excludeTheseDependencies ++ excludeThese): _*)

def common: Seq[Setting[_]] = automateHeaderSettings(Compile, Test) ++ Seq(
headerMappings := headerMappings.value ++ Seq(
de.heikoseeberger.sbtheader.FileType("proto") -> HeaderCommentStyle.cppStyleLineComment,
Expand Down Expand Up @@ -388,13 +392,17 @@ lazy val `proxy-core` = (project in file("proxy/core"))
akkaDependency("akka-stream"),
akkaDependency("akka-slf4j"),
akkaDependency("akka-discovery"),
akkaDependency("akka-cluster-typed"),
akkaHttpDependency("akka-http"),
akkaHttpDependency("akka-http-spray-json"),
akkaHttpDependency("akka-http-core"),
akkaHttpDependency("akka-http2-support"),
akkaDependency("akka-cluster-sharding", ExclusionRule("org.lmdbjava", "lmdbjava")),
akkaManagementDependency("akka-management-cluster-bootstrap"),
akkaDiscoveryDependency("akka-discovery-kubernetes-api"),
akkaProjectionsDependency("akka-projection-core"),
akkaProjectionsDependency("akka-projection-eventsourced"),
akkaProjectionsDependency("akka-projection-testkit"), // Needed for in memory support
"com.google.protobuf" % "protobuf-java" % ProtobufVersion % "protobuf",
"com.google.protobuf" % "protobuf-java-util" % ProtobufVersion,
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test,
Expand Down Expand Up @@ -458,7 +466,8 @@ lazy val `proxy-cassandra` = (project in file("proxy/cassandra"))
dependencyOverrides += "io.grpc" % "grpc-netty-shaded" % GrpcNettyShadedVersion,
libraryDependencies ++= Seq(
akkaPersistenceCassandraDependency("akka-persistence-cassandra", ExclusionRule("com.github.jnr")),
akkaPersistenceCassandraDependency("akka-persistence-cassandra-launcher") % Test
akkaPersistenceCassandraDependency("akka-persistence-cassandra-launcher") % Test,
"com.lightbend.akka" %% "akka-projection-cassandra" % AkkaProjectionsVersion
),
fork in run := true,
mainClass in Compile := Some("io.cloudstate.proxy.CloudStateProxyMain"),
Expand All @@ -479,6 +488,7 @@ lazy val `proxy-jdbc` = (project in file("proxy/jdbc"))
dependencyOverrides += "io.grpc" % "grpc-netty-shaded" % GrpcNettyShadedVersion,
libraryDependencies ++= Seq(
"com.github.dnvriend" %% "akka-persistence-jdbc" % AkkaPersistenceJdbcVersion,
"com.lightbend.akka" %% "akka-projection-slick" % AkkaProjectionsVersion,
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test
),
fork in run := true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,6 @@

import com.oracle.svm.core.annotate.*;

@TargetClass(
className = "io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess",
onlyWith = Existence.class)
final class Target_io_netty_util_internal_shaded_org_jctools_util_UnsafeRefArrayAccess {
@Alias
@RecomputeFieldValue(kind = RecomputeFieldValue.Kind.ArrayIndexShift, declClass = Object[].class)
public static int REF_ELEMENT_SHIFT;
}

@TargetClass(className = "io.netty.util.internal.CleanerJava6", onlyWith = Existence.class)
final class Target_io_netty_util_internal_CleanerJava6 {
@Alias
@RecomputeFieldValue(
kind = RecomputeFieldValue.Kind.FieldOffset,
declClassName = "java.nio.DirectByteBuffer",
name = "cleaner")
private static long CLEANER_FIELD_OFFSET;
}

@TargetClass(className = "io.netty.util.internal.PlatformDependent", onlyWith = Existence.class)
final class Target_io_netty_util_internal_PlatformDependent {
@Alias
@RecomputeFieldValue(kind = RecomputeFieldValue.Kind.ArrayBaseOffset, declClass = byte[].class)
private static long ARRAY_BASE_OFFSET;
}

@TargetClass(className = "io.netty.util.internal.PlatformDependent0", onlyWith = Existence.class)
final class Target_io_netty_util_internal_PlatformDependent0 {
@Alias
@RecomputeFieldValue(
kind = RecomputeFieldValue.Kind.FieldOffset,
declClassName = "java.nio.Buffer",
name = "address")
private static long ADDRESS_FIELD_OFFSET;
}

@TargetClass(className = "org.agrona.concurrent.AbstractConcurrentArrayQueue")
final class Target_org_agrona_concurrent_AbstractConcurrentArrayQueue {
@Alias
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ object AnySupport {
.asInstanceOf[Seq[(String, Primitive[Any])]]
.toMap

private final val objectMapper = new ObjectMapper()
final val objectMapper = new ObjectMapper()

private def primitiveToBytes[T](primitive: Primitive[T], value: T): ByteString =
if (value != primitive.defaultValue) {
Expand Down Expand Up @@ -178,6 +178,8 @@ object AnySupport {
descriptor.getDependencies.asScala.toSeq ++ descriptor.getPublicDependencies.asScala)
}
}

def extractBytes(bytes: ByteString): ByteString = bytesToPrimitive(BytesPrimitive, bytes)
}

class AnySupport(descriptors: Array[Descriptors.FileDescriptor],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ import java.lang.annotation.Annotation
import java.lang.reflect.{AccessibleObject, Executable, Member, Method, ParameterizedType, Type, WildcardType}
import java.util.Optional

import akka.NotUsed
import io.cloudstate.javasupport.{
CloudEvent,
Context,
EntityContext,
EntityId,
Jsonable,
Metadata,
MetadataContext,
ServiceCallFactory
}
import com.google.protobuf.{Any => JavaPbAny}

import scala.reflect.ClassTag
import scala.runtime.BoxedUnit

/**
* How we do reflection:
Expand Down Expand Up @@ -208,13 +211,12 @@ private[impl] object ReflectionHelper {

verifyAtMostOneMainArgument("CommandHandler", method, parameters)

parameters.foreach {
case MainArgumentParameterHandler(inClass) if !inClass.isAssignableFrom(serviceMethod.inputType.typeClass) =>
throw new RuntimeException(
s"Incompatible command class $inClass for command $name, expected ${serviceMethod.inputType.typeClass}"
)
case _ =>
}
val mainArgumentDecoder: JavaPbAny => AnyRef = parameters
.collectFirst {
case MainArgumentParameterHandler(inClass) =>
getMainArgumentDecoder(name, inClass, serviceMethod.inputType)
}
.getOrElse(_ => NotUsed)

private def serialize(result: AnyRef) =
JavaPbAny
Expand Down Expand Up @@ -249,13 +251,31 @@ private[impl] object ReflectionHelper {
}

def invoke(obj: AnyRef, command: JavaPbAny, context: CommandContext): Optional[JavaPbAny] = {
val decodedCommand = serviceMethod.inputType.parseFrom(command.getValue).asInstanceOf[AnyRef]
val decodedCommand = mainArgumentDecoder(command)
val ctx = InvocationContext(decodedCommand, context)
val result = method.invoke(obj, parameters.map(_.apply(ctx)): _*)
handleResult(result)
}
}

def getMainArgumentDecoder(name: String, actualType: Class[_], pbType: ResolvedType[_]): JavaPbAny => AnyRef =
if (actualType.isAssignableFrom(pbType.typeClass)) { pbAny =>
pbType.parseFrom(pbAny.getValue).asInstanceOf[AnyRef]
} else if (pbType.typeClass.equals(classOf[JavaPbAny]) && actualType.getAnnotation(classOf[Jsonable]) != null) {
val reader = AnySupport.objectMapper.readerFor(actualType)
pbAny => {
if (pbAny.getTypeUrl.startsWith(AnySupport.CloudStateJson)) {
reader.readValue(AnySupport.extractBytes(pbAny.getValue).newInput()).asInstanceOf[AnyRef]
} else {
throw new RuntimeException(
s"Don't know how to deserialize protobuf Any type with type URL ${pbAny.getTypeUrl} "
)
}
}
} else {
throw new RuntimeException(s"Incompatible input class $actualType for call $name, expected ${pbType.typeClass}")
}

def getRawType(t: Type): Class[_] = t match {
case clazz: Class[_] => clazz
case pt: ParameterizedType => getRawType(pt.getRawType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.{CompletableFuture, CompletionStage}

import akka.NotUsed
import akka.stream.{javadsl, Materializer}
import akka.stream.javadsl.{AsPublisher, Source}
import akka.stream.javadsl.Source
import akka.stream.scaladsl.{JavaFlowSupport, Sink}
import com.google.protobuf.{Descriptors, Any => JavaPbAny}
import io.cloudstate.javasupport.action._
Expand All @@ -33,7 +33,6 @@ import io.cloudstate.javasupport.impl.{
ResolvedServiceMethod,
ResolvedType
}
import io.cloudstate.javasupport.Metadata

/**
* Annotation based implementation of the [[ActionHandler]].
Expand Down Expand Up @@ -247,23 +246,19 @@ private object ActionReflection {
ReflectionHelper.getRawType(parameterType) match {
case envelope if envelope == classOf[MessageEnvelope[_]] =>
val messageType = ReflectionHelper.getFirstParameter(parameterType)
if (messageType != resolvedType.typeClass) {
throw new RuntimeException(
s"Incompatible message class $messageType for call $method, expected ${resolvedType.typeClass}"
)
} else { envelope =>
val decoder = ReflectionHelper.getMainArgumentDecoder(method, messageType, resolvedType)

{ envelope =>
MessageEnvelope.of(
resolvedType.parseFrom(envelope.payload.getValue).asInstanceOf[AnyRef],
decoder(envelope.payload),
envelope.metadata
)
}
case payload =>
if (payload != resolvedType.typeClass) {
throw new RuntimeException(
s"Incompatible message class $payload for call $method, expected ${resolvedType.typeClass}"
)
} else { envelope =>
resolvedType.parseFrom(envelope.payload.getValue).asInstanceOf[AnyRef]
val decoder = ReflectionHelper.getMainArgumentDecoder(method, payload, resolvedType)

{ envelope =>
decoder(envelope.payload)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.example.valueentity.shoppingcart.Shoppingcart;
import io.cloudstate.javasupport.CloudState;
import io.cloudstate.javasupport.tck.model.eventlogeventing.EventLogSubscriber;
import io.cloudstate.javasupport.tck.model.valuebased.ValueEntityTckModelEntity;
import io.cloudstate.javasupport.tck.model.valuebased.ValueEntityTwoEntity;
import io.cloudstate.javasupport.tck.model.action.ActionTckModelBehavior;
Expand All @@ -29,6 +30,7 @@
import io.cloudstate.samples.shoppingcart.ShoppingCartEntity;
import io.cloudstate.tck.model.Action;
import io.cloudstate.tck.model.Crdt;
import io.cloudstate.tck.model.Eventlogeventing;
import io.cloudstate.tck.model.Eventsourced;
import io.cloudstate.tck.model.valueentity.Valueentity;

Expand Down Expand Up @@ -66,6 +68,17 @@ public static final void main(String[] args) throws Exception {
.registerEventSourcedEntity(
EventSourcedTwoEntity.class,
Eventsourced.getDescriptor().findServiceByName("EventSourcedTwo"))
.registerAction(
new EventLogSubscriber(),
Eventlogeventing.getDescriptor().findServiceByName("EventLogSubscriberModel"))
.registerEventSourcedEntity(
io.cloudstate.javasupport.tck.model.eventlogeventing.EventSourcedEntityOne.class,
Eventlogeventing.getDescriptor().findServiceByName("EventSourcedEntityOne"),
Eventlogeventing.getDescriptor())
.registerEventSourcedEntity(
io.cloudstate.javasupport.tck.model.eventlogeventing.EventSourcedEntityTwo.class,
Eventlogeventing.getDescriptor().findServiceByName("EventSourcedEntityTwo"),
Eventlogeventing.getDescriptor())
.registerEventSourcedEntity(
io.cloudstate.samples.eventsourced.shoppingcart.ShoppingCartEntity.class,
com.example.shoppingcart.Shoppingcart.getDescriptor().findServiceByName("ShoppingCart"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2019 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cloudstate.javasupport.tck.model.eventlogeventing;

import akka.NotUsed;
import akka.stream.javadsl.Source;
import io.cloudstate.javasupport.CloudEvent;
import io.cloudstate.javasupport.action.Action;
import io.cloudstate.javasupport.action.ActionContext;
import io.cloudstate.javasupport.action.ActionReply;
import io.cloudstate.javasupport.action.CallHandler;
import io.cloudstate.tck.model.EventLogSubscriberModel;
import io.cloudstate.tck.model.Eventlogeventing;

@Action
public class EventLogSubscriber {

@CallHandler
public ActionReply<Eventlogeventing.Response> processEventOne(
ActionContext context, CloudEvent cloudEvent, Eventlogeventing.EventOne eventOne) {
return convert(context, cloudEvent, eventOne.getStep());
}

@CallHandler
public Source<ActionReply<Eventlogeventing.Response>, NotUsed> processEventTwo(
ActionContext context, CloudEvent cloudEvent, Eventlogeventing.EventTwo eventTwo) {
return Source.from(eventTwo.getStepList()).map(step -> convert(context, cloudEvent, step));
}

@CallHandler
public Eventlogeventing.Response effect(Eventlogeventing.EffectRequest request) {
return Eventlogeventing.Response.newBuilder()
.setId(request.getId())
.setMessage(request.getMessage())
.build();
}

@CallHandler
public Eventlogeventing.Response processAnyEvent(JsonMessage jsonMessage, CloudEvent cloudEvent) {
return Eventlogeventing.Response.newBuilder()
.setId(cloudEvent.subject().orElse(""))
.setMessage(jsonMessage.message)
.build();
}

private ActionReply<Eventlogeventing.Response> convert(
ActionContext context, CloudEvent cloudEvent, Eventlogeventing.ProcessStep step) {
String id = cloudEvent.subject().orElse("");
if (step.hasReply()) {
return ActionReply.message(
Eventlogeventing.Response.newBuilder()
.setId(id)
.setMessage(step.getReply().getMessage())
.build());
} else if (step.hasForward()) {
return ActionReply.forward(
context
.serviceCallFactory()
.lookup(EventLogSubscriberModel.name, "Effect", Eventlogeventing.EffectRequest.class)
.createCall(
Eventlogeventing.EffectRequest.newBuilder()
.setId(id)
.setMessage(step.getForward().getMessage())
.build()));
} else {
throw new RuntimeException("No reply or forward");
}
}
}
Loading

0 comments on commit 1ac498c

Please sign in to comment.