Skip to content

Commit

Permalink
Merge pull request #142 from permaweb/feat/scheduler-locations-and-hints
Browse files Browse the repository at this point in the history
feat(M2): Support for legacynet scheduler look + new hints mechanic
  • Loading branch information
samcamwilliams authored Feb 20, 2025
2 parents 58132e3 + 48e58d0 commit 627c2fd
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 66 deletions.
2 changes: 1 addition & 1 deletion src/dev_codec_httpsig_conv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ to(TABM, Opts) when is_map(TABM) ->
<<"signature">> => Sig,
<<"signature-input">> => SigInput
};
not_found -> Enc2
_ -> Enc2
end.

% We need to generate a unique, reproducible boundary for the
Expand Down
11 changes: 6 additions & 5 deletions src/dev_json_iface.erl
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,12 @@ postprocess_outbox(Msg, Proc, Opts) ->
test_init() ->
application:ensure_all_started(hb).

json_to_message_test() ->
{ok, JSON} = file:read_file("test/example_json_iface_result.json"),
{ok, Msg} = json_to_message(JSON, #{}),
?event({msg, Msg}),
?assertEqual(<<"OK">>, hb_converge:get(<<"outbox/1/result">>, Msg, #{})).
% Disabled due to missing test file
% json_to_message_test() ->
% {ok, JSON} = file:read_file("test/example_json_iface_result.json"),
% {ok, Msg} = json_to_message(JSON, #{}),
% ?event({msg, Msg}),
% ?assertEqual(<<"OK">>, hb_converge:get(<<"outbox/1/result">>, Msg, #{})).

generate_stack(File) ->
generate_stack(File, <<"WASM">>).
Expand Down
227 changes: 167 additions & 60 deletions src/dev_scheduler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,42 @@ schedule(Msg1, Msg2, Opts) ->
%% for this scheduler. If so, it schedules the message and returns the assignment.
post_schedule(Msg1, Msg2, Opts) ->
?event(scheduling_message),
% Find the target message to schedule:
ToSched = find_message_to_schedule(Msg1, Msg2, Opts),
?event({to_sched, ToSched}),
% Find the ProcessID of the target message:
% - If it is a Process, use the ID of the message.
% - If not, use the target as the ProcessID.
ProcID =
case hb_converge:get(<<"type">>, ToSched, not_found, Opts) of
<<"Process">> ->
hb_message:id(ToSched, all);
<<"Process">> -> hb_message:id(ToSched, all);
_ ->
case hb_converge:get(<<"target">>, ToSched, not_found, Opts) of
not_found ->
find_schedule_id(Msg1, Msg2, Opts);
not_found -> find_schedule_id(Msg1, Msg2, Opts);
Target -> Target
end
end,
?event({proc_id, ProcID}),
% Filter all unsigned keys from the source message.
case hb_message:with_only_attested(ToSched) of
{ok, Filtered} ->
do_post_schedule(ProcID, Filtered, Msg1, Opts);
{ok, OnlyAttested} ->
?event(
{post_schedule,
{schedule_id, ProcID},
{message, ToSched}
}
),
% Find the relevant scheduler server for the given process and
% message, start a new one if necessary, or return a redirect to the
% correct remote scheduler.
case find_server(ProcID, Msg1, ToSched, Opts) of
{local, PID} ->
?event({scheduling_message_locally, {proc_id, ProcID}, {pid, PID}}),
do_post_schedule(ProcID, PID, OnlyAttested, Opts);
{redirect, Redirect} ->
?event({redirecting_to_scheduler, {redirect, Redirect}}),
{ok, Redirect}
end;
{error, _} ->
{ok,
#{
Expand All @@ -187,58 +206,15 @@ post_schedule(Msg1, Msg2, Opts) ->
end.

%% @doc Post schedule the message.
do_post_schedule(ProcID, ToSched, Msg1, Opts) ->
PID =
case dev_scheduler_registry:find(ProcID, false, Opts) of
not_found ->
% Check if we are the scheduler for this process.
Address = hb_util:human_id(ar_wallet:to_address(
hb_opts:get(priv_wallet, hb:wallet(), Opts))),
Proc = find_process(Msg1, Opts),
SchedLoc =
hb_converge:get_first(
[
{Proc, <<"scheduler-location">>},
{ToSched, <<"scheduler-location">>},
{Proc, <<"scheduler">>},
{ToSched, <<"scheduler">>}
],
not_found,
Opts#{ hashpath => ignore }
),
case SchedLoc of
Address ->
% Start the scheduler process if we are the scheduler.
dev_scheduler_registry:find(ProcID, true, Opts);
not_found ->
throw(
{scheduler_location_not_found,
{proc_id, ProcID}
}
);
ProcScheduler ->
throw(
{scheduler_location_mismatch,
{local, Address},
{required, ProcScheduler}
}
)
end;
Proc -> Proc
end,
?event(
{post_schedule,
{schedule_id, ProcID},
{scheduler_pid, PID},
{message, ToSched}
}
),
do_post_schedule(ProcID, PID, Msg2, Opts) ->
% Should we verify the message again before scheduling?
Verified =
case hb_opts:get(verify_assignments, true, Opts) of
true -> hb_message:verify(ToSched, signers);
true -> hb_message:verify(Msg2, signers);
false -> true
end,
case {Verified, hb_converge:get(type, ToSched)} of
% Handle scheduling of the message if the message is valid.
case {Verified, hb_converge:get(type, Msg2)} of
{false, _} ->
{ok,
#{
Expand All @@ -247,19 +223,106 @@ do_post_schedule(ProcID, ToSched, Msg1, Opts) ->
}
};
{true, <<"Process">>} ->
hb_cache:write(ToSched, Opts),
spawn(fun() -> hb_client:upload(ToSched) end),
hb_cache:write(Msg2, Opts),
spawn(fun() -> hb_client:upload(Msg2) end),
?event(
{registering_new_process,
{proc_id, ProcID},
{pid, PID},
{is_alive, is_process_alive(PID)}
}
),
{ok, dev_scheduler_server:schedule(PID, ToSched)};
{ok, dev_scheduler_server:schedule(PID, Msg2)};
{true, _} ->
% If Message2 is not a process, use the ID of Message1 as the PID
{ok, dev_scheduler_server:schedule(PID, ToSched)}
{ok, dev_scheduler_server:schedule(PID, Msg2)}
end.

%% @doc Locate the correct scheduling server for a given process.
find_server(ProcID, Msg1, ToSched, Opts) ->
case get_hint(ProcID, Opts) of
{ok, Hint} ->
?event({found_hint_in_proc_id, Hint}),
generate_redirect(ProcID, Hint);
not_found ->
?event({no_hint_in_proc_id, ProcID}),
case dev_scheduler_registry:find(ProcID, false, Opts) of
PID when is_pid(PID) ->
?event({found_pid_in_local_registry, PID}),
{local, PID};
not_found ->
?event({no_pid_in_local_registry, ProcID}),
% Find the process from the message.
Proc = find_process(Msg1, Opts),
?event({found_process, Proc}),
% Check if we are the scheduler for this process.
Address = hb_util:human_id(ar_wallet:to_address(
hb_opts:get(priv_wallet, hb:wallet(), Opts))),
?event({local_address, Address}),
SchedLoc =
hb_converge:get_first(
[
{Proc, <<"scheduler">>},
{Proc, <<"scheduler-location">>},
{ToSched, <<"scheduler-location">>}
],
not_found,
Opts#{ hashpath => ignore }
),
?event({sched_loc, SchedLoc}),
case SchedLoc of
Address ->
% We are the scheduler. Start the server if it has not already
% been started.
{local, dev_scheduler_registry:find(ProcID, true, Opts)};
_ ->
% We are not the scheduler. Find it and return a redirect.
find_remote_scheduler(ProcID, SchedLoc, Opts)
end
end
end.

%% @doc If a hint is present in the string, return it. Else, return not_found.
get_hint(Str, Opts) ->
case hb_opts:get(scheduler_follow_hints, true, Opts) of
true ->
case binary:split(Str, <<"?">>, [global]) of
[_, QS] ->
QueryMap = maps:from_list(uri_string:dissect_query(QS)),
case maps:get(<<"hint">>, QueryMap, not_found) of
not_found -> not_found;
Hint -> {ok, Hint}
end;
_ -> not_found
end;
false -> not_found
end.

%% @doc Generate a redirect message to a scheduler.
generate_redirect(ProcID, URL) ->
{redirect,
#{
<<"status">> => 307,
<<"location">> => <<URL/binary, "/", ProcID/binary>>,
<<"method">> => <<"POST">>,
<<"body">> => <<"Redirecting to scheduler: ", URL/binary>>
}
}.

%% @doc Use the SchedulerLocation to the remote path and return a redirect.
find_remote_scheduler(ProcID, SchedulerLocation, Opts) ->
% Parse the SchedulerLocation to see if it has a hint. If there is a hint,
% we will use it to construct a redirect message.
case get_hint(SchedulerLocation, Opts) of
{ok, Hint} ->
% We have a hint. Construct a redirect message.
generate_redirect(ProcID, Hint);
not_found ->
{ok, SchedMsg} =
hb_gateway_client:scheduler_location(SchedulerLocation, Opts),
{ok, SchedURL} = hb_converge:resolve(SchedMsg, <<"url">>, Opts),
% We have a valid path. Construct a redirect message.
generate_redirect(ProcID, SchedURL)
end.

%% @doc Returns information about the current slot for a process.
Expand Down Expand Up @@ -427,8 +490,9 @@ checkpoint(State) -> {ok, State}.
%% @doc Generate a _transformed_ process message, not as they are generated
%% by users. See `dev_process' for examples of AO process messages.
test_process() -> test_process(hb:wallet()).
test_process(Wallet) ->
Address = hb_util:human_id(ar_wallet:to_address(Wallet)),
test_process(Wallet) when not is_binary(Wallet) ->
test_process(hb_util:human_id(ar_wallet:to_address(Wallet)));
test_process(Address) ->
#{
<<"device">> => <<"scheduler@1.0">>,
<<"device-stack">> =>
Expand Down Expand Up @@ -498,6 +562,49 @@ schedule_message_and_get_slot_test() ->
when CurrentSlot > 0,
hb_converge:resolve(Msg1, Msg3, #{})).

redirect_to_hint_test() ->
start(),
RandAddr = hb_util:human_id(crypto:strong_rand_bytes(32)),
TestLoc = <<"http://test.computer">>,
Msg1 = test_process(<< RandAddr/binary, "?hint=", TestLoc/binary>>),
Msg2 = #{
<<"path">> => <<"schedule">>,
<<"method">> => <<"POST">>,
<<"body">> => Msg1
},
?assertMatch(
{ok, #{ <<"location">> := Location }} when is_binary(Location),
hb_converge:resolve(Msg1, Msg2, #{ scheduler_follow_hints => true })).

redirect_from_graphql_test() ->
start(),
Opts =
#{ store =>
[
{hb_store_fs, #{ prefix => "main-cache" }},
{hb_store_gateway, #{}}
]
},
{ok, Msg} = hb_cache:read(<<"0syT13r0s0tgPmIed95bJnuSqaD29HQNN8D3ElLSrsc">>, Opts),
?assertMatch(
{ok, #{ <<"location">> := Location }} when is_binary(Location),
hb_converge:resolve(
Msg,
#{
<<"path">> => <<"schedule">>,
<<"method">> => <<"POST">>,
<<"body">> =>
hb_message:attest(#{
<<"type">> => <<"Message">>,
<<"target">> =>
<<"0syT13r0s0tgPmIed95bJnuSqaD29HQNN8D3ElLSrsc">>,
<<"test-key">> => <<"Test-Val">>
}, hb:wallet())
},
#{}
)
).

get_schedule_test() ->
start(),
Msg1 = test_process(),
Expand Down

0 comments on commit 627c2fd

Please sign in to comment.