From 340f54904f812799e68eb20f2b0709064cf97b5b Mon Sep 17 00:00:00 2001 From: Ashton Wiersdorf Date: Wed, 6 Nov 2024 13:31:46 -0700 Subject: [PATCH] Skip flakey test, document TCP transport, elixir-format --- lib/chorex.ex | 88 +++++++++++++------------------ test/chorex/socket_proxy_test.exs | 1 + 2 files changed, 37 insertions(+), 52 deletions(-) diff --git a/lib/chorex.ex b/lib/chorex.ex index 937eb91..06fe49c 100644 --- a/lib/chorex.ex +++ b/lib/chorex.ex @@ -330,62 +330,36 @@ defmodule Chorex do Note the 2-tuple: the first element is the module to be proxied, and the second element should be the PID of an already-running proxy. - ### Manually setting up the shared-state choreography (deprecated) + ### **Experimental** TCP transport setup - (*Note: we recommend using the `Chorex.start` mechanism now.*) + You can run choreographies over TCP. Instead of specifying the + implementing module's name in the actor ↦ module map, put a tuple + like `{:remote, local_port, remote_host, remote_port}`. A process + will begin listening on `local_port` and forward messages to the + proper actors on the current node. Messages going to a remote actor + will be buffered until a TCP connection is established, at which + point they'll be sent FIFO. - You need to be a little careful when setting up the shared state - choreography. Instead of setting up all the actors manually, you need - to set up *one* instance of each shared-state actor, then create - separate *sessions* for each instance of the choreography that you - want to run. + Example with hosts `alice.net` and `bob.net`: - Here is an example with two buyers trying to buy the same book: + Host `alice.net`: ```elixir - # Start up the buyers - b1 = spawn(MyBuyer, :init, [[]]) - b2 = spawn(MyBuyer, :init, [[]]) - - # Start up the seller proxy with the initial shared - # state (the stock of books in this case) - {:ok, px} = GenServer.start(Chorex.Proxy, %{"Anathem" => 1}) - - # Start sessions: one for each buyer - Proxy.begin_session(px, [b1], MySellerBackend, :init, []) - config1 = %{Buyer => b1, Seller => px, :super => self()} + Chorex.start(BasicRemote.Chorex, + %{SockAlice => SockAliceImpl, + SockBob => {:remote, 4242, "bob.net", 4243}}, []) + ``` - Proxy.begin_session(px, [b2], MySellerBackend, :init, []) - config2 = %{Buyer => b2, Seller => px, :super => self()} + Host `bob.net`: - # Send everyone their configuration - send(b1, {:config, config1}) - send(px, {:chorex, b1, {:config, config1}}) - send(b2, {:config, config2}) - send(px, {:chorex, b2, {:config, config2}}) + ```elixir + Chorex.start(BasicRemote.Chorex, + %{SockAlice => {:remote, 4243, "alice.net", 4242}, + SockBob => SockBobImpl}, []) ``` - The `Proxy.begin_sesion` function takes a proxy function, a list of - PIDs that partake in a given session, and a module, function, arglist - for the thing to proxy. - - **Sessions**: PIDs belonging to a session will have their messages - routed to the corresponding proxied process. The GenServer looks up - which session a PID belongs to, finds the proxied process linked to - that session, then forwards the message to that process. The exact - mechanisms of how this works may change in the future to accommodate - restarts. - - When you send the config information to a proxied process, you send it - through the proxy first, and you must wrap the message as shown above - with a process from the session you want to target as the second - element in the tuple; this just helps the proxy figure out the session - you want. - - That's it! If you run the above choreography, the process that kicks - this all off will get *one* message like `{:chorex_return, Buyer, :book_get}` - and *one* message like `{:chorex_return, Buyer, :darn_missed_it}`, - indicating that exactly one of the buyers got the coveted book. + **WARNING** this transport is *experimental* and not guaranteed to + work. We've had issues with message delivery during testing. PRs welcome! """ import WriterMonad @@ -714,9 +688,11 @@ defmodule Chorex do unquote(recver_exp) = receive do - {:chorex, ^tok, unquote(actor1), unquote(actor2), msg} -> msg + {:chorex, ^tok, unquote(actor1), unquote(actor2), msg} -> + msg + m -> - IO.inspect(m, label: "#{inspect self()} got unexpected message") + IO.inspect(m, label: "#{inspect(self())} got unexpected message") IO.inspect(tok, label: "tok") 42 end @@ -1288,8 +1264,15 @@ defmodule Chorex do end def merge_step( - {:__block__, _, [{:=, _, [{:tok, _, _}, {{:., _, [Access, :get]}, _, [{:config, _, _}, :session_token]}]} = tok_get, {:receive, _, _} = lhs_rcv]}, - {:__block__, _, [tok_get, {:receive, _, _} = rhs_rcv]}) do + {:__block__, _, + [ + {:=, _, + [{:tok, _, _}, {{:., _, [Access, :get]}, _, [{:config, _, _}, :session_token]}]} = + tok_get, + {:receive, _, _} = lhs_rcv + ]}, + {:__block__, _, [tok_get, {:receive, _, _} = rhs_rcv]} + ) do quote do unquote(tok_get) unquote(merge_step(lhs_rcv, rhs_rcv)) @@ -1318,7 +1301,8 @@ defmodule Chorex do # merge same branch def merge_step( - {:receive, m1, [[do: [{:->, m2, [[{:{}, m3, [:choice, tok, agent, dest, dir]}], branch1]}]]]}, + {:receive, m1, + [[do: [{:->, m2, [[{:{}, m3, [:choice, tok, agent, dest, dir]}], branch1]}]]]}, {:receive, _, [[do: [{:->, _, [[{:{}, _, [:choice, tok, agent, dest, dir]}], branch2]}]]]} ) do {:receive, m1, diff --git a/test/chorex/socket_proxy_test.exs b/test/chorex/socket_proxy_test.exs index 576caec..4230344 100644 --- a/test/chorex/socket_proxy_test.exs +++ b/test/chorex/socket_proxy_test.exs @@ -24,6 +24,7 @@ defmodule Chorex.SocketProxyTest do use BasicRemote.Chorex, :sockbob end + @tag :skip test "basic proxy works" do # Spin up two tasks to collect responses alice_receiver = Task.async(fn ->