Skip to content

Commit

Permalink
Merge pull request #148 from permaweb/feat/patch@1.0
Browse files Browse the repository at this point in the history
feat: add a device that searches the `patch-from` location and applies patch messages
  • Loading branch information
samcamwilliams authored Feb 23, 2025
2 parents 32e5119 + aa1b246 commit b0a058e
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 96 deletions.
65 changes: 65 additions & 0 deletions src/dev_delegated_compute.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
%%% @doc Simple wrapper module that enables compute on remote machines,
%%% implementing the JSON-Iface. This can be used either as a standalone, to
%%% bring trusted results into the local node, or as the `Execution-Device' of
%%% an AO process.
-module(dev_delegated_compute).
-export([init/3, compute/3, normalize/3, snapshot/3]).
-include("include/hb.hrl").
-include_lib("eunit/include/eunit.hrl").

%% @doc Initialize or normalize the compute-lite device. For now, we don't
%% need to do anything special here.
init(Msg1, _Msg2, _Opts) ->
{ok, Msg1}.
normalize(Msg1, _Msg2, _Opts) -> {ok, Msg1}.
snapshot(Msg1, _Msg2, _Opts) -> {ok, Msg1}.

compute(Msg1, Msg2, Opts) ->
ProcessID = hb_converge:get(<<"process/id">>, Msg1, Opts),
Slot = hb_converge:get(<<"slot">>, Msg2, Opts),
?event(push, {compute_lite_called, {process_id, ProcessID}, {slot, Slot}}),
OutputPrefix = dev_stack:prefix(Msg1, Msg2, Opts),
Accept = hb_converge:get(<<"accept">>, Msg2, <<"application/http">>, Opts),
ProcessID =
hb_converge:get_first(
[
{Msg1, <<"process/id">>},
{Msg2, <<"process-id">>}
],
Opts
),
{ok, JSONRes} = do_compute(ProcessID, Slot, Opts),
?event(push, {compute_lite_res, {process_id, ProcessID}, {slot, Slot}, {json_res, JSONRes}}),
{ok, Msg} = dev_json_iface:json_to_message(JSONRes, Opts),
{ok,
hb_converge:set(
Msg1,
#{
<<OutputPrefix/binary, "/results">> => Msg,
<<OutputPrefix/binary, "/results/json">> => JSONRes
},
Opts
)
}.

%% @doc Execute computation on a remote machine via relay and the JSON-Iface.
do_compute(ProcID, Slot, Opts) ->
?event(debug_leader, {do_compute_called, {process_id, ProcID}, {slot, Slot}}),
Res =
hb_converge:resolve(#{ <<"device">> => <<"relay@1.0">> }, #{
<<"path">> => <<"call">>,
<<"relay-path">> =>
<<
"/result/",
(integer_to_binary(Slot))/binary,
"?process-id=",
ProcID/binary
>>
},
Opts
),
?event(debug_leader, {res, Res}),
{ok, Response} = Res,
JSONRes = hb_converge:get(<<"body">>, Response, Opts),
?event({json_res, JSONRes}),
{ok, JSONRes}.
77 changes: 18 additions & 59 deletions src/dev_genesis_wasm.erl
Original file line number Diff line number Diff line change
@@ -1,65 +1,24 @@
%%% @doc Simple wrapper module that enables compute on remote machines,
%%% implementing the JSON-Iface. This can be used either as a standalone, to
%%% bring trusted results into the local node, or as the `Execution-Device' of
%%% an AO process.
%%% @doc A device that mimics an environment suitable for `legacynet` AO
%%% processes, using HyperBEAM infrastructure. This allows existing `legacynet`
%%% AO process definitions to be used in HyperBEAM.
-module(dev_genesis_wasm).
-export([init/3, compute/3, normalize/3, snapshot/3]).
-include("include/hb.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("include/hb.hrl").

%% @doc Initialize or normalize the compute-lite device. For now, we don't
%% need to do anything special here.
init(Msg1, _Msg2, _Opts) ->
{ok, Msg1}.
normalize(Msg1, _Msg2, _Opts) -> {ok, Msg1}.
snapshot(Msg1, _Msg2, _Opts) -> {ok, Msg1}.
%% @doc Initialize the device.
init(Msg, _Msg2, _Opts) -> {ok, Msg}.

compute(Msg1, Msg2, Opts) ->
ProcessID = hb_converge:get(<<"process/id">>, Msg1, Opts),
Slot = hb_converge:get(<<"slot">>, Msg2, Opts),
?event(push, {compute_lite_called, {process_id, ProcessID}, {slot, Slot}}),
OutputPrefix = dev_stack:prefix(Msg1, Msg2, Opts),
Accept = hb_converge:get(<<"accept">>, Msg2, <<"application/http">>, Opts),
ProcessID =
hb_converge:get_first(
[
{Msg1, <<"process/id">>},
{Msg2, <<"process-id">>}
],
Opts
),
{ok, JSONRes} = do_compute(ProcessID, Slot, Opts),
?event(push, {compute_lite_res, {process_id, ProcessID}, {slot, Slot}, {json_res, JSONRes}}),
{ok, Msg} = dev_json_iface:json_to_message(JSONRes, Opts),
{ok,
hb_converge:set(
Msg1,
#{
<<OutputPrefix/binary, "/results">> => Msg,
<<OutputPrefix/binary, "/results/json">> => JSONRes
},
Opts
)
}.
%% @doc All the `delegated-compute@1.0` device to execute the request. We then apply
%% the `patch@1.0` device, applying any state patches that the AO process may have
%% requested.
compute(Msg, Msg2, Opts) ->
{ok, Msg3} = hb_converge:resolve(Msg, {as, <<"delegated-compute@1.0">>, Msg2}, Opts),
{ok, Msg4} = hb_converge:resolve(Msg3, {as, <<"patch@1.0">>, Msg2}, Opts),
{ok, Msg4}.

%% @doc Execute computation on a remote machine via relay and the JSON-Iface.
do_compute(ProcID, Slot, Opts) ->
?event(debug_leader, {do_compute_called, {process_id, ProcID}, {slot, Slot}}),
Res =
hb_converge:resolve(#{ <<"device">> => <<"relay@1.0">> }, #{
<<"path">> => <<"call">>,
<<"relay-path">> =>
<<
"/result/",
(integer_to_binary(Slot))/binary,
"?process-id=",
ProcID/binary
>>
},
Opts
),
?event(debug_leader, {res, Res}),
{ok, Response} = Res,
JSONRes = hb_converge:get(<<"body">>, Response, Opts),
?event({json_res, JSONRes}),
{ok, JSONRes}.
%% @doc Normalize the device.
normalize(Msg, _Msg2, _Opts) -> {ok, Msg}.

%% @doc Snapshot the device.
snapshot(Msg, _Msg2, _Opts) -> {ok, Msg}.
161 changes: 161 additions & 0 deletions src/dev_patch.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
%%% @doc A device that finds `PATCH' requests in the `results/outbox'
%%% of its message, and applies them to it. This can be useful for processes
%%% whose computation would like to manipulate data outside of the `results' key
%%% of its message.
-module(dev_patch).
-export([init/3, compute/3, normalize/3, snapshot/3]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("include/hb.hrl").

%% @doc Default process device hooks.
init(Msg1, _Msg2, _Opts) -> {ok, Msg1}.
normalize(Msg1, _Msg2, _Opts) -> {ok, Msg1}.
snapshot(Msg1, _Msg2, _Opts) -> {ok, Msg1}.

%% @doc Find `PATCH' requests in the `results/outbox' of the message, and apply
%% them to the state.
compute(Msg1, Msg2, Opts) ->
% Find the input keys.
PatchFrom = hb_converge:get_first(
[
{Msg2, <<"patch-from">>},
{Msg1, <<"patch-from">>}
],
<<"/results/outbox">>,
Opts
),
PatchTo = hb_converge:get_first(
[
{Msg2, <<"patch-to">>},
{Msg1, <<"patch-to">>}
],
<<"/">>,
Opts
),
?event({patch_from, PatchFrom}),
?event({patch_to, PatchTo}),
% Get the outbox from the message.
Outbox = hb_converge:get(PatchFrom, Msg1, #{}, Opts),
% Find all messages with the PATCH request.
Patches =
maps:filter(
fun(_, Msg) ->
hb_converge:get(<<"method">>, Msg, Opts) == <<"PATCH">>
end,
Outbox
),
OutboxWithoutPatches = maps:without(maps:keys(Patches), Outbox),
% Find the state to apply the patches to.
% Apply the patches to the state.
PatchedSubmessage =
maps:fold(
fun(_, Patch, MsgN) ->
?event({patching, {patch, Patch}, {before, MsgN}}),
Res = hb_converge:set(
MsgN,
maps:without([<<"method">>], Patch),
Opts
),
?event({patched, {'after', Res}}),
Res
end,
case PatchTo of
not_found -> Msg1;
PatchTo -> hb_converge:get(PatchTo, Msg1, Opts)
end,
Patches
),
PatchedState =
case PatchTo of
<<"/">> -> PatchedSubmessage;
_ -> hb_converge:set(Msg1, PatchTo, PatchedSubmessage, Opts)
end,
% Return the patched message.
Res = {
ok,
hb_converge:set(
PatchedState,
<<"/results/outbox">>,
OutboxWithoutPatches,
Opts
)
},
?event({patch_result, Res}),
Res.

%%% Tests

uninitialized_patch_test() ->
InitState = #{
<<"device">> => <<"patch@1.0">>,
<<"results">> => #{
<<"outbox">> => #{
<<"1">> => #{
<<"method">> => <<"PATCH">>,
<<"prices">> => #{
<<"apple">> => 100,
<<"banana">> => 200
}
},
<<"2">> => #{
<<"method">> => <<"GET">>,
<<"prices">> => #{
<<"apple">> => 1000
}
}
}
},
<<"other-message">> => <<"other-value">>,
<<"patch-to">> => <<"/">>,
<<"patch-from">> => <<"/results/outbox">>
},
{ok, ResolvedState} =
hb_converge:resolve(
InitState,
<<"compute">>,
#{}
),
?event({resolved_state, ResolvedState}),
?assertEqual(
100,
hb_converge:get(<<"prices/apple">>, ResolvedState, #{})
),
?assertMatch(
not_found,
hb_converge:get(<<"results/outbox/1">>, ResolvedState, #{})
).

patch_to_submessage_test() ->
InitState = #{
<<"device">> => <<"patch@1.0">>,
<<"results">> => #{
<<"outbox">> => #{
<<"1">> => #{
<<"method">> => <<"PATCH">>,
<<"prices">> => #{
<<"apple">> => 100,
<<"banana">> => 200
}
}
}
},
<<"state">> => #{
<<"prices">> => #{
<<"apple">> => 1000
}
},
<<"other-message">> => <<"other-value">>,
<<"patch-to">> => <<"/state">>,
<<"patch-from">> => <<"/results/outbox">>
},
{ok, ResolvedState} =
hb_converge:resolve(
InitState,
<<"compute">>,
#{}
),
?event({resolved_state, ResolvedState}),
?assertEqual(
100,
hb_converge:get(<<"state/prices/apple">>, ResolvedState, #{})
).
Loading

0 comments on commit b0a058e

Please sign in to comment.