diff --git a/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj b/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj index 521e0632741..09b0a5050f5 100644 --- a/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj +++ b/src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj @@ -93,6 +93,11 @@ Akka.FSharp {81574240-BC31-4BE4-B447-ADF0D32F4246} + + Akka.Remote + {ea4ff8fd-7c53-49c8-b9aa-02e458b3e6a7} + True + Akka.TestKit {0D3CBAD0-BBDB-43E5-AFC4-ED1D3ECDC224} diff --git a/src/core/Akka.FSharp.Tests/ApiTests.fs b/src/core/Akka.FSharp.Tests/ApiTests.fs index 14c243f4193..bd064857fa8 100644 --- a/src/core/Akka.FSharp.Tests/ApiTests.fs +++ b/src/core/Akka.FSharp.Tests/ApiTests.fs @@ -39,39 +39,36 @@ type TestUnion2 = | C of string * TestUnion | D of int - [] -let ``can serialize discriminated unions`` () = - let x = B (23,"hello") - use sys = System.create "system" (Configuration.defaultConfig()) - let serializer = sys.Serialization.FindSerializerFor x - let bytes = serializer.ToBinary x - let des = serializer.FromBinary (bytes, typeof) :?> TestUnion - des - |> equals x +let ``can serialize and deserialize discriminated unions over remote nodes`` () = + let remoteConfig port = + sprintf """ + akka { + actor { + ask-timeout = 5s + provider = "Akka.Remote.RemoteActorRefProvider, Akka.Remote" + } + remote { + helios.tcp { + port = %i + hostname = localhost + } + } + } + """ port + |> Configuration.parse -[] -let ``can serialize nested discriminated unions`` () = - let x = C("bar",B (23,"hello")) - use sys = System.create "system" (Configuration.defaultConfig()) - let serializer = sys.Serialization.FindSerializerFor x - let bytes = serializer.ToBinary x - let des = serializer.FromBinary (bytes, typeof) :?> TestUnion2 - des - |> equals x + use server = System.create "server-system" (remoteConfig 9911) + use client = System.create "client-system" (remoteConfig 0) -type testType1 = - string * int + let aref = + spawne client "a-1" <@ actorOf2 (fun mailbox msg -> + match msg with + | C("a-11", B(11, "a-12")) -> mailbox.Sender() mailbox.Unhandled msg) @> + [SpawnOption.Deploy (Deploy(RemoteScope (Address.Parse "akka.tcp://server-system@localhost:9911")))] + let msg = C("a-11", B(11, "a-12")) + let response = aref Async.RunSynchronously + response + |> equals msg -type testType2 = - | V2 of testType1 - -[] -let MyTest () = - let x = V2("hello!",123) - use sys = System.create "system" (Configuration.defaultConfig()) - let serializer = sys.Serialization.FindSerializerFor x - let bytes = serializer.ToBinary x - let des = serializer.FromBinary (bytes, typeof) :?> testType2 - des - |> equals x \ No newline at end of file diff --git a/src/core/Akka.FSharp/FsApi.fs b/src/core/Akka.FSharp/FsApi.fs index e678546698f..e22338f11db 100644 --- a/src/core/Akka.FSharp/FsApi.fs +++ b/src/core/Akka.FSharp/FsApi.fs @@ -12,9 +12,66 @@ open System open Microsoft.FSharp.Quotations open Microsoft.FSharp.Linq.QuotationEvaluation +module Serialization = + open Nessos.FsPickler + open Akka.Serialization + + let internal serializeToBinary (fsp:BinarySerializer) o = + use stream = new System.IO.MemoryStream() + fsp.Serialize(stream, o) + stream.ToArray() + + let internal deserializeFromBinary<'t> (fsp:BinarySerializer) (bytes: byte array) = + use stream = new System.IO.MemoryStream(bytes) + fsp.Deserialize<'t> stream + + let private jobjectType = Type.GetType("Newtonsoft.Json.Linq.JObject, Newtonsoft.Json") + let private jsonSerlizerType = Type.GetType("Newtonsoft.Json.JsonSerializer, Newtonsoft.Json") + let private toObjectMethod = jobjectType.GetMethod("ToObject", [|typeof; jsonSerlizerType|]) + + let tryDeserializeJObject jsonSerializer o : 'Message option = + let t = typeof<'Message> + if o <> null && o.GetType().Equals jobjectType + then + try + let res = toObjectMethod.Invoke(o, [|t; jsonSerializer|]) + Some (res :?> 'Message) + with + | _ -> None // type conversion failed (passed JSON is not of expected type) + else None + + + // used for top level serialization + type ExprSerializer(system) = + inherit Serializer(system) + let fsp = FsPickler.CreateBinary() + override __.Identifier = 9 + override __.IncludeManifest = true + override __.ToBinary(o) = serializeToBinary fsp o + override __.FromBinary(bytes, _) = deserializeFromBinary fsp bytes + + + let internal exprSerializationSupport (system: ActorSystem) = + let serializer = ExprSerializer(system :?> ExtendedActorSystem) + system.Serialization.AddSerializer(serializer) + system.Serialization.AddSerializationMap(typeof, serializer) [] module Actors = + open System.Threading.Tasks + + let private tryCast (t:Task) : 'Message = + match t.Result with + | :? 'Message as m -> m + | o -> + let context = Akka.Actor.Internal.InternalCurrentActorCellKeeper.Current + if context = null + then failwith "Cannot cast JObject outside the actor system context " + else + let serializer = context.System.Serialization.FindSerializerForType typeof<'Message> :?> Akka.Serialization.NewtonSoftJsonSerializer + match Serialization.tryDeserializeJObject serializer.Serializer o with + | Some m -> m + | None -> raise (InvalidCastException("Tried to cast JObject to " + typeof<'Message>.ToString())) /// /// Unidirectional send operator. @@ -26,7 +83,9 @@ module Actors = /// Bidirectional send operator. Sends a message object directly to actor /// tracked by actorRef and awaits for response send back from corresponding actor. /// - let inline ( = tell.Ask<'Message> msg |> Async.AwaitTask + let ( = + tell.Ask(msg).ContinueWith(Func<_,'Message>(tryCast), TaskContinuationOptions.AttachedToParent|||TaskContinuationOptions.ExecuteSynchronously) + |> Async.AwaitTask /// Pipes an output of asynchronous expression directly to the recipients mailbox. let pipeTo (computation : Async<'T>) (recipient : ICanTell) (sender : IActorRef) : unit = @@ -239,7 +298,7 @@ module Actors = member __.Stash() = (this :> IWithUnboundedStash).Stash.Stash() member __.Unstash() = (this :> IWithUnboundedStash).Stash.Unstash() member __.UnstashAll() = (this :> IWithUnboundedStash).Stash.UnstashAll() } - + new(actor : Expr -> Cont<'Message, 'Returned>>) = FunActor(actor.Compile () ()) member __.Sender() : IActorRef = base.Sender member __.Unhandled msg = base.Unhandled msg @@ -247,8 +306,12 @@ module Actors = match state with | Func f -> match msg with - | :? 'Message as matched -> state <- f matched - | _ -> x.Unhandled msg + | :? 'Message as m -> state <- f m + | _ -> + let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof :?> Akka.Serialization.NewtonSoftJsonSerializer + match Serialization.tryDeserializeJObject serializer.Serializer msg with + | Some(m) -> state <- f m + | None -> x.Unhandled msg | Return _ -> x.PostStop() override x.PostStop() = base.PostStop () @@ -337,35 +400,7 @@ module Linq = type Expression = static member ToExpression(f : System.Linq.Expressions.Expression>>) = toExpression> f static member ToExpression<'Actor>(f : Quotations.Expr<(unit -> 'Actor)>) = toExpression<'Actor> (QuotationEvaluator.ToLinqExpression f) - -module Serialization = - open Nessos.FsPickler - open Akka.Serialization - - let internal serializeToBinary (fsp:BinarySerializer) o = - use stream = new System.IO.MemoryStream() - fsp.Serialize(stream, o) - stream.ToArray() - - let internal deserializeFromBinary<'t> (fsp:BinarySerializer) (bytes: byte array) = - use stream = new System.IO.MemoryStream(bytes) - fsp.Deserialize<'t> stream - - // used for top level serialization - type ExprSerializer(system) = - inherit Serializer(system) - let fsp = FsPickler.CreateBinary() - override __.Identifier = 9 - override __.IncludeManifest = true - override __.ToBinary(o) = serializeToBinary fsp o - override __.FromBinary(bytes, _) = deserializeFromBinary fsp bytes - - let internal exprSerializationSupport (system: ActorSystem) = - let serializer = ExprSerializer(system :?> ExtendedActorSystem) - system.Serialization.AddSerializer(serializer) - system.Serialization.AddSerializationMap(typeof, serializer) - [] module Configuration = diff --git a/src/core/Akka.Persistence.FSharp/FsApi.fs b/src/core/Akka.Persistence.FSharp/FsApi.fs index 082c194bb37..7c542d798c3 100644 --- a/src/core/Akka.Persistence.FSharp/FsApi.fs +++ b/src/core/Akka.Persistence.FSharp/FsApi.fs @@ -142,17 +142,25 @@ type FunPersistentActor<'Command, 'Event, 'State>(aggregate: Aggregate<'Command, member __.SaveSnapshot state = this.SaveSnapshot(state) member __.DeleteSnapshot seqNr timestamp = this.DeleteSnapshot(seqNr, timestamp) member __.DeleteSnapshots criteria = this.DeleteSnapshots(criteria) } - + member __.Sender() : IActorRef = base.Sender member __.Unhandled msg = base.Unhandled msg override x.OnCommand (msg: obj) = match msg with | :? 'Command as cmd -> aggregate.exec mailbox state cmd - | _ -> () // ignore? + | _ -> + let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof :?> Akka.Serialization.NewtonSoftJsonSerializer + match Serialization.tryDeserializeJObject serializer.Serializer msg with + | Some(cmd) -> aggregate.exec mailbox state cmd + | None -> x.Unhandled msg override x.OnRecover (msg: obj) = match msg with | :? 'Event as e -> state <- aggregate.apply mailbox state e - | _ -> () // ignore? + | _ -> + let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof :?> Akka.Serialization.NewtonSoftJsonSerializer + match Serialization.tryDeserializeJObject serializer.Serializer msg with + | Some(e) -> state <- aggregate.apply mailbox state e + | None -> x.Unhandled msg override x.PostStop () = base.PostStop () List.iter (fun fn -> fn()) deferables @@ -265,8 +273,14 @@ type FunPersistentView<'Event, 'State>(perspective: Perspective<'Event, 'State>, match msg with | :? 'Event as e -> state <- perspective.apply mailbox state e - true - | _ -> false // ignore? + true + | _ -> + let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof :?> Akka.Serialization.NewtonSoftJsonSerializer + match Serialization.tryDeserializeJObject serializer.Serializer msg with + | Some(e) -> + state <- perspective.apply mailbox state e + true + | None -> false override x.PostStop () = base.PostStop () List.iter (fun fn -> fn()) deferables @@ -335,12 +349,20 @@ type Deliverer<'Command, 'Event, 'State>(aggregate: DeliveryAggregate<'Command, override x.ReceiveCommand (msg: obj) = match msg with | :? 'Command as cmd -> aggregate.exec mailbox state cmd - | _ -> () // ignore? + | _ -> + let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof :?> Akka.Serialization.NewtonSoftJsonSerializer + match Serialization.tryDeserializeJObject serializer.Serializer msg with + | Some(cmd) -> aggregate.exec mailbox state cmd + | None -> x.Unhandled msg true override x.ReceiveRecover (msg: obj) = match msg with | :? 'Event as e -> state <- aggregate.apply mailbox state e - | _ -> () // ignore? + | _ -> + let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof :?> Akka.Serialization.NewtonSoftJsonSerializer + match Serialization.tryDeserializeJObject serializer.Serializer msg with + | Some(e) -> state <- aggregate.apply mailbox state e + | None -> x.Unhandled msg true override x.PostStop () = base.PostStop () diff --git a/src/core/Akka/Serialization/NewtonSoftJsonSerializer.cs b/src/core/Akka/Serialization/NewtonSoftJsonSerializer.cs index 2f7052b384f..a3d7ba650fe 100644 --- a/src/core/Akka/Serialization/NewtonSoftJsonSerializer.cs +++ b/src/core/Akka/Serialization/NewtonSoftJsonSerializer.cs @@ -29,6 +29,7 @@ public class NewtonSoftJsonSerializer : Serializer private readonly JsonSerializer _serializer; public JsonSerializerSettings Settings { get { return _settings; } } + public object Serializer { get { return _serializer; } } /// /// Initializes a new instance of the class.