Skip to content

Commit

Permalink
Reuse sockets for synchronous calls
Browse files Browse the repository at this point in the history
  • Loading branch information
savq committed Sep 29, 2022
1 parent bef5ee9 commit 914d502
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
20 changes: 14 additions & 6 deletions src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Malt.Worker(0x0000, Process(`…`, ProcessRunning))
mutable struct Worker
port::UInt16
proc::Process
sync_socket::Union{TCPSocket, Nothing}

function Worker(;exeflags=[])
# Spawn process
Expand All @@ -45,7 +46,7 @@ mutable struct Worker
port = parse(UInt16, port_str)

# There's no reason to keep the worker process alive after the manager loses its handle.
w = finalizer(w -> @async(stop(w)), new(port, proc))
w = finalizer(w -> @async(stop(w)), new(port, proc, nothing))
atexit(() -> stop(w))

return w
Expand Down Expand Up @@ -88,15 +89,22 @@ end
function _recv(socket)
try
response = deserialize(socket)
response.result
response #.result
catch e
rethrow(e)
end
end

function _send(w::Worker, msg)
function _send_sync(w::Worker, msg)
isrunning(w) || throw(TerminatedWorkerException())
_recv(_send_msg(w.port, msg))

# Create or replace socket if necessary
if !(isa(w.sync_socket, TCPSocket) && isopen(w.sync_socket))
w.sync_socket = connect(w.port)
end

serialize(w.sync_socket, msg)
_recv(w.sync_socket)
end

# TODO: Unwrap TaskFailedExceptions
Expand Down Expand Up @@ -150,7 +158,7 @@ end
Shorthand for `fetch(Malt.remotecall(…))`. Blocks and then returns the result of the remote call.
"""
function remotecall_fetch(f, w::Worker, args...; kwargs...)
_send(w, _new_call_msg(true, f, args..., kwargs...))
_send_sync(w, _new_call_msg(true, f, args..., kwargs...))
end


Expand All @@ -160,7 +168,7 @@ end
Shorthand for `wait(Malt.remotecall(…))`. Blocks and discards the resulting value.
"""
function remotecall_wait(f, w::Worker, args...; kwargs...)
_send(w, _new_call_msg(false, f, args..., kwargs...))
_send_sync(w, _new_call_msg(false, f, args..., kwargs...))
end


Expand Down
25 changes: 13 additions & 12 deletions src/worker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,38 @@ function serve(server::Sockets.TCPServer)

# Handle request asynchronously
latest = @async begin
msg = deserialize(sock)
if get(msg, :header, nothing) === :interrupt
if latest isa Task && !istaskdone(latest)
Base.throwto(latest, InterruptException)
while isopen(sock)
msg = deserialize(sock)
if get(msg, :header, nothing) === :interrupt
interrupt(latest)
else
@debug(msg)
handle(Val(msg.header), sock, msg)
end
else
@debug(msg)
handle(Val(msg.header), sock, msg)
end
end
catch InterruptException
# Rethrow interrupt in the latest task
@debug("Caught interrupt!")
if latest isa Task && !istaskdone(latest)
Base.throwto(latest, InterruptException)
end
interrupt(latest)
continue
end
end
@debug("Closed server socket. Bye!")
end

interrupt(t::Task) = istaskdone(t) || Base.throwto(latest, InterruptException)
interrupt(t::Nothing) = nothing

function handle(::Val{:call}, socket, msg)
try
result = msg.f(msg.args...; msg.kwargs...)

# @debug("Result", result)
serialize(socket, (status=:ok, result=(msg.send_result ? result : nothing)))
serialize(socket, msg.send_result ? result : nothing)
catch e
# @debug("Exception!", e)
serialize(socket, (status=:err, result=e))
serialize(socket, e)
end
end

Expand Down

0 comments on commit 914d502

Please sign in to comment.