Skip to content

Commit

Permalink
Merge pull request #1203 from Horusiath/fsapi-jobject-deserialization
Browse files Browse the repository at this point in the history
FSharp API: fixed problem with JObject deserialization
  • Loading branch information
Aaronontheweb committed Aug 7, 2015
2 parents 440ead2 + 91c86c4 commit 3a89069
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 71 deletions.
5 changes: 5 additions & 0 deletions src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
<Name>Akka.FSharp</Name>
<Project>{81574240-BC31-4BE4-B447-ADF0D32F4246}</Project>
</ProjectReference>
<ProjectReference Include="..\Akka.Remote\Akka.Remote.csproj">
<Name>Akka.Remote</Name>
<Project>{ea4ff8fd-7c53-49c8-b9aa-02e458b3e6a7}</Project>
<Private>True</Private>
</ProjectReference>
<ProjectReference Include="..\Akka.TestKit\Akka.TestKit.csproj">
<Name>Akka.TestKit</Name>
<Project>{0D3CBAD0-BBDB-43E5-AFC4-ED1D3ECDC224}</Project>
Expand Down
61 changes: 29 additions & 32 deletions src/core/Akka.FSharp.Tests/ApiTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -39,39 +39,36 @@ type TestUnion2 =
| C of string * TestUnion
| D of int


[<Fact>]
let ``can serialize discriminated unions`` () =
let x = B (23,"hello")
use sys = System.create "system" (Configuration.defaultConfig())
let serializer = sys.Serialization.FindSerializerFor x
let bytes = serializer.ToBinary x
let des = serializer.FromBinary (bytes, typeof<TestUnion>) :?> TestUnion
des
|> equals x
let ``can serialize and deserialize discriminated unions over remote nodes`` () =
let remoteConfig port =
sprintf """
akka {
actor {
ask-timeout = 5s
provider = "Akka.Remote.RemoteActorRefProvider, Akka.Remote"
}
remote {
helios.tcp {
port = %i
hostname = localhost
}
}
}
""" port
|> Configuration.parse

[<Fact>]
let ``can serialize nested discriminated unions`` () =
let x = C("bar",B (23,"hello"))
use sys = System.create "system" (Configuration.defaultConfig())
let serializer = sys.Serialization.FindSerializerFor x
let bytes = serializer.ToBinary x
let des = serializer.FromBinary (bytes, typeof<TestUnion2>) :?> TestUnion2
des
|> equals x
use server = System.create "server-system" (remoteConfig 9911)
use client = System.create "client-system" (remoteConfig 0)

type testType1 =
string * int
let aref =
spawne client "a-1" <@ actorOf2 (fun mailbox msg ->
match msg with
| C("a-11", B(11, "a-12")) -> mailbox.Sender() <! msg
| _ -> mailbox.Unhandled msg) @>
[SpawnOption.Deploy (Deploy(RemoteScope (Address.Parse "akka.tcp://server-system@localhost:9911")))]
let msg = C("a-11", B(11, "a-12"))
let response = aref <? msg |> Async.RunSynchronously
response
|> equals msg

type testType2 =
| V2 of testType1

[<Fact>]
let MyTest () =
let x = V2("hello!",123)
use sys = System.create "system" (Configuration.defaultConfig())
let serializer = sys.Serialization.FindSerializerFor x
let bytes = serializer.ToBinary x
let des = serializer.FromBinary (bytes, typeof<testType2>) :?> testType2
des
|> equals x
99 changes: 67 additions & 32 deletions src/core/Akka.FSharp/FsApi.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,66 @@ open System
open Microsoft.FSharp.Quotations
open Microsoft.FSharp.Linq.QuotationEvaluation

module Serialization =
open Nessos.FsPickler
open Akka.Serialization

let internal serializeToBinary (fsp:BinarySerializer) o =
use stream = new System.IO.MemoryStream()
fsp.Serialize(stream, o)
stream.ToArray()

let internal deserializeFromBinary<'t> (fsp:BinarySerializer) (bytes: byte array) =
use stream = new System.IO.MemoryStream(bytes)
fsp.Deserialize<'t> stream

let private jobjectType = Type.GetType("Newtonsoft.Json.Linq.JObject, Newtonsoft.Json")
let private jsonSerlizerType = Type.GetType("Newtonsoft.Json.JsonSerializer, Newtonsoft.Json")
let private toObjectMethod = jobjectType.GetMethod("ToObject", [|typeof<System.Type>; jsonSerlizerType|])

let tryDeserializeJObject jsonSerializer o : 'Message option =
let t = typeof<'Message>
if o <> null && o.GetType().Equals jobjectType
then
try
let res = toObjectMethod.Invoke(o, [|t; jsonSerializer|])
Some (res :?> 'Message)
with
| _ -> None // type conversion failed (passed JSON is not of expected type)
else None


// used for top level serialization
type ExprSerializer(system) =
inherit Serializer(system)
let fsp = FsPickler.CreateBinary()
override __.Identifier = 9
override __.IncludeManifest = true
override __.ToBinary(o) = serializeToBinary fsp o
override __.FromBinary(bytes, _) = deserializeFromBinary fsp bytes


let internal exprSerializationSupport (system: ActorSystem) =
let serializer = ExprSerializer(system :?> ExtendedActorSystem)
system.Serialization.AddSerializer(serializer)
system.Serialization.AddSerializationMap(typeof<Expr>, serializer)

[<AutoOpen>]
module Actors =
open System.Threading.Tasks

let private tryCast (t:Task<obj>) : 'Message =
match t.Result with
| :? 'Message as m -> m
| o ->
let context = Akka.Actor.Internal.InternalCurrentActorCellKeeper.Current
if context = null
then failwith "Cannot cast JObject outside the actor system context "
else
let serializer = context.System.Serialization.FindSerializerForType typeof<'Message> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer o with
| Some m -> m
| None -> raise (InvalidCastException("Tried to cast JObject to " + typeof<'Message>.ToString()))

/// <summary>
/// Unidirectional send operator.
Expand All @@ -26,7 +83,9 @@ module Actors =
/// Bidirectional send operator. Sends a message object directly to actor
/// tracked by actorRef and awaits for response send back from corresponding actor.
/// </summary>
let inline (<?) (tell : #ICanTell) (msg : obj) : Async<'Message> = tell.Ask<'Message> msg |> Async.AwaitTask
let (<?) (tell : #ICanTell) (msg : obj) : Async<'Message> =
tell.Ask(msg).ContinueWith(Func<_,'Message>(tryCast), TaskContinuationOptions.AttachedToParent|||TaskContinuationOptions.ExecuteSynchronously)
|> Async.AwaitTask

/// Pipes an output of asynchronous expression directly to the recipients mailbox.
let pipeTo (computation : Async<'T>) (recipient : ICanTell) (sender : IActorRef) : unit =
Expand Down Expand Up @@ -239,16 +298,20 @@ module Actors =
member __.Stash() = (this :> IWithUnboundedStash).Stash.Stash()
member __.Unstash() = (this :> IWithUnboundedStash).Stash.Unstash()
member __.UnstashAll() = (this :> IWithUnboundedStash).Stash.UnstashAll() }

new(actor : Expr<Actor<'Message> -> Cont<'Message, 'Returned>>) = FunActor(actor.Compile () ())
member __.Sender() : IActorRef = base.Sender
member __.Unhandled msg = base.Unhandled msg
override x.OnReceive msg =
match state with
| Func f ->
match msg with
| :? 'Message as matched -> state <- f matched
| _ -> x.Unhandled msg
| :? 'Message as m -> state <- f m
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(m) -> state <- f m
| None -> x.Unhandled msg
| Return _ -> x.PostStop()
override x.PostStop() =
base.PostStop ()
Expand Down Expand Up @@ -337,35 +400,7 @@ module Linq =
type Expression =
static member ToExpression(f : System.Linq.Expressions.Expression<System.Func<FunActor<'Message, 'v>>>) = toExpression<FunActor<'Message, 'v>> f
static member ToExpression<'Actor>(f : Quotations.Expr<(unit -> 'Actor)>) = toExpression<'Actor> (QuotationEvaluator.ToLinqExpression f)

module Serialization =
open Nessos.FsPickler
open Akka.Serialization

let internal serializeToBinary (fsp:BinarySerializer) o =
use stream = new System.IO.MemoryStream()
fsp.Serialize(stream, o)
stream.ToArray()

let internal deserializeFromBinary<'t> (fsp:BinarySerializer) (bytes: byte array) =
use stream = new System.IO.MemoryStream(bytes)
fsp.Deserialize<'t> stream

// used for top level serialization
type ExprSerializer(system) =
inherit Serializer(system)
let fsp = FsPickler.CreateBinary()
override __.Identifier = 9
override __.IncludeManifest = true
override __.ToBinary(o) = serializeToBinary fsp o
override __.FromBinary(bytes, _) = deserializeFromBinary fsp bytes


let internal exprSerializationSupport (system: ActorSystem) =
let serializer = ExprSerializer(system :?> ExtendedActorSystem)
system.Serialization.AddSerializer(serializer)
system.Serialization.AddSerializationMap(typeof<Expr>, serializer)

[<RequireQualifiedAccess>]
module Configuration =

Expand Down
36 changes: 29 additions & 7 deletions src/core/Akka.Persistence.FSharp/FsApi.fs
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,25 @@ type FunPersistentActor<'Command, 'Event, 'State>(aggregate: Aggregate<'Command,
member __.SaveSnapshot state = this.SaveSnapshot(state)
member __.DeleteSnapshot seqNr timestamp = this.DeleteSnapshot(seqNr, timestamp)
member __.DeleteSnapshots criteria = this.DeleteSnapshots(criteria) }

member __.Sender() : IActorRef = base.Sender
member __.Unhandled msg = base.Unhandled msg
override x.OnCommand (msg: obj) =
match msg with
| :? 'Command as cmd -> aggregate.exec mailbox state cmd
| _ -> () // ignore?
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(cmd) -> aggregate.exec mailbox state cmd
| None -> x.Unhandled msg
override x.OnRecover (msg: obj) =
match msg with
| :? 'Event as e -> state <- aggregate.apply mailbox state e
| _ -> () // ignore?
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(e) -> state <- aggregate.apply mailbox state e
| None -> x.Unhandled msg
override x.PostStop () =
base.PostStop ()
List.iter (fun fn -> fn()) deferables
Expand Down Expand Up @@ -265,8 +273,14 @@ type FunPersistentView<'Event, 'State>(perspective: Perspective<'Event, 'State>,
match msg with
| :? 'Event as e ->
state <- perspective.apply mailbox state e
true
| _ -> false // ignore?
true
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(e) ->
state <- perspective.apply mailbox state e
true
| None -> false
override x.PostStop () =
base.PostStop ()
List.iter (fun fn -> fn()) deferables
Expand Down Expand Up @@ -335,12 +349,20 @@ type Deliverer<'Command, 'Event, 'State>(aggregate: DeliveryAggregate<'Command,
override x.ReceiveCommand (msg: obj) =
match msg with
| :? 'Command as cmd -> aggregate.exec mailbox state cmd
| _ -> () // ignore?
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(cmd) -> aggregate.exec mailbox state cmd
| None -> x.Unhandled msg
true
override x.ReceiveRecover (msg: obj) =
match msg with
| :? 'Event as e -> state <- aggregate.apply mailbox state e
| _ -> () // ignore?
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(e) -> state <- aggregate.apply mailbox state e
| None -> x.Unhandled msg
true
override x.PostStop () =
base.PostStop ()
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka/Serialization/NewtonSoftJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class NewtonSoftJsonSerializer : Serializer
private readonly JsonSerializer _serializer;

public JsonSerializerSettings Settings { get { return _settings; } }
public object Serializer { get { return _serializer; } }

/// <summary>
/// Initializes a new instance of the <see cref="NewtonSoftJsonSerializer" /> class.
Expand Down

0 comments on commit 3a89069

Please sign in to comment.