Skip to content

Commit

Permalink
fix jl_putc printing array, remove Deserializer type, give AsyncStrea…
Browse files Browse the repository at this point in the history
…m synchronous methods, fix web_repl (by replacing code with original from JuliaLang).

closes #50. closes #46. closes #40 (i think). closes #25.
  • Loading branch information
vtjnash committed Nov 25, 2012
1 parent 75cb6c3 commit ac786c8
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 359 deletions.
2 changes: 1 addition & 1 deletion base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ quit() = exit()
function repl_callback(ast::ANY, show_value)
# use root task to execute user input
global _repl_enough_stdin = true
#stop_reading(STDIN)
stop_reading(STDIN)
STDIN.readcb = false
put(_jl_repl_channel, (ast, show_value))
end
Expand Down
205 changes: 103 additions & 102 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ function add_workers(PGRP::ProcessGroup, w::Array{Any,1})
push(PGRP.workers, w[i])
w[i].id = PGRP.np+i
send_msg_now(w[i], w[i].id, newlocs)
Deserializer(message_handler_loop,w[i].socket)
create_message_handler_loop(w[i].socket)
end
PGRP.locs = newlocs
PGRP.np += n
Expand Down Expand Up @@ -720,6 +720,7 @@ function perform_work(job::WorkItem)
global Waiting, Workqueue
local result
try
ccall(:jl_register_toplevel_eh, Void, ())
if isa(job.task,Task)
# continuing interrupted work item
arg = job.argument
Expand All @@ -732,11 +733,10 @@ function perform_work(job::WorkItem)
result = yieldto(job.task)
end
catch e
#show(e)
print("exception on ", myid(), ": ")
show(e)
println()
result = e
result = e.e
end
# restart job by yielding back to whatever task just switched to us
job.task = current_task().last
Expand Down Expand Up @@ -799,8 +799,6 @@ function deliver_result(sock::Stream, msg, oid, value)
end
end

deliver_result(sock::Deserializer,msg,oid,value) = deliver_result(sock.stream,msg,oid,value)

const _jl_empty_cell_ = {}
function deliver_result(sock::(), msg, oid, value_thunk)
global Waiting
Expand Down Expand Up @@ -864,96 +862,99 @@ function accept_handler(server::TcpSocket, status::Int32)
if err!=0
print("accept error: ", _uv_lasterror(globalEventLoop()), "\n")
else
Deserializer(message_handler_loop,client)
create_message_handler_loop(client)
end
end

type DisconnectException <: Exception end

function message_handler_loop(this::Deserializer)
global PGRP
#println("message_handler_loop")
refs = (PGRP::ProcessGroup).refs
if PGRP.np == 0
# first connection; get process group info from client
PGRP.myid = force(deserialize(this))
PGRP.locs = locs = force(deserialize(this))
#print("\nLocation: ",locs,"\nId:",PGRP.myid,"\n")
# joining existing process group
PGRP.np = length(PGRP.locs)
PGRP.workers = w = cell(PGRP.np)
w[1] = Worker("", 0, this.stream, 1)
for i = 2:(PGRP.myid-1)
w[i] = Worker(locs[i].host, locs[i].port)
w[i].id = i
Deserializer(message_handler_loop,w[i].socket)
send_msg_now(w[i], :identify_socket, PGRP.myid)
end
w[PGRP.myid] = LocalProcess()
for i = (PGRP.myid+1):PGRP.np
w[i] = nothing
end
end
#println("loop")
while true
#try
msg = force(deserialize(this))
#println("got msg: ",msg)
# handle message
if is(msg, :call) || is(msg, :call_fetch) || is(msg, :call_wait)
id = force(deserialize(this))
f = deserialize(this)
args = deserialize(this)
#print("$(myid()) got call $id\n")
wi = schedule_call(id, f, args)
if is(msg, :call_fetch)
wi.notify = (this, :call_fetch, id, wi.notify)
elseif is(msg, :call_wait)
wi.notify = (this, :wait, id, wi.notify)
function create_message_handler_loop(this::AsyncStream) #returns immediately
enq_work(@task begin
global PGRP
#println("message_handler_loop")
refs = (PGRP::ProcessGroup).refs
start_reading(this)
if PGRP.np == 0
# first connection; get process group info from client
PGRP.myid = force(deserialize(this))
PGRP.locs = locs = force(deserialize(this))
#print("\nLocation: ",locs,"\nId:",PGRP.myid,"\n")
# joining existing process group
PGRP.np = length(PGRP.locs)
PGRP.workers = w = cell(PGRP.np)
w[1] = Worker("", 0, this, 1)
for i = 2:(PGRP.myid-1)
w[i] = Worker(locs[i].host, locs[i].port)
w[i].id = i
create_message_handler_loop(w[i].socket)
send_msg_now(w[i], :identify_socket, PGRP.myid)
end
elseif is(msg, :do)
f = deserialize(this)
args = deserialize(this)
#print("got args: $args\n")
let func=f, ar=args
enq_work(WorkItem(()->apply(force(func),force(ar))))
w[PGRP.myid] = LocalProcess()
for i = (PGRP.myid+1):PGRP.np
w[i] = nothing
end
elseif is(msg, :result)
# used to deliver result of wait or fetch
mkind = force(deserialize(this))
oid = force(deserialize(this))
val = deserialize(this)
deliver_result((), mkind, oid, val)
elseif is(msg, :identify_socket)
otherid = force(deserialize(this))
_jl_identify_socket(otherid, this.stream)
else
# the synchronization messages
oid = force(deserialize(this))::(Int,Int)
wi = lookup_ref(oid)
if wi.done
deliver_result(this.stream, msg, oid, work_result(wi))
end
#println("loop")
while true
#try
msg = force(deserialize(this))
#println("got msg: ",msg)
# handle message
if is(msg, :call) || is(msg, :call_fetch) || is(msg, :call_wait)
id = force(deserialize(this))
f = deserialize(this)
args = deserialize(this)
#print("$(myid()) got call $id\n")
wi = schedule_call(id, f, args)
if is(msg, :call_fetch)
wi.notify = (this, :call_fetch, id, wi.notify)
elseif is(msg, :call_wait)
wi.notify = (this, :wait, id, wi.notify)
end
elseif is(msg, :do)
f = deserialize(this)
args = deserialize(this)
#print("got args: $args\n")
let func=f, ar=args
enq_work(WorkItem(()->apply(force(func),force(ar))))
end
elseif is(msg, :result)
# used to deliver result of wait or fetch
mkind = force(deserialize(this))
oid = force(deserialize(this))
val = deserialize(this)
deliver_result((), mkind, oid, val)
elseif is(msg, :identify_socket)
otherid = force(deserialize(this))
_jl_identify_socket(otherid, this)
else
# add to WorkItem's notify list
# TODO: should store the worker here, not the socket,
# so we don't need to look up the worker later
wi.notify = (this, msg, oid, wi.notify)
# the synchronization messages
oid = force(deserialize(this))::(Int,Int)
wi = lookup_ref(oid)
if wi.done
deliver_result(this, msg, oid, work_result(wi))
else
# add to WorkItem's notify list
# TODO: should store the worker here, not the socket,
# so we don't need to look up the worker later
wi.notify = (this, msg, oid, wi.notify)
end
end
end
#catch e
# if isa(e,EOFError)
# print("eof. $(myid()) exiting\n")
# stop_reading(this.stream)
# # TODO: remove machine from group
# throw(DisconnectException())
# else
# print("deserialization error: ", e, "\n")
# #while nb_available(sock) > 0 #|| select(sock)
# # read(sock, Uint8)
# #end
# end
#end
end
#catch e
# if isa(e,EOFError)
# print("eof. $(myid()) exiting\n")
# stop_reading(this)
# # TODO: remove machine from group
# throw(DisconnectException())
# else
# print("deserialization error: ", e, "\n")
# #while nb_available(sock) > 0 #|| select(sock)
# # read(sock, Uint8)
# #end
# end
#end
end
end)
end

## worker creation and setup ##
Expand Down Expand Up @@ -1017,21 +1018,22 @@ function start_remote_workers(machines, cmds)
break
end
end
w[i] = wrker = Worker(hostname, port)

# redirect console output from workers to the client's stdout:
start_reading(stream,function(stream::AsyncStream,nread::Int)
if(nread>0)
try
line = readbytes(stream.buffer, nread)
print("\tFrom worker $(wrker.id):\t",line)
catch e
println("\tError parsing reply from worker $(wrker.id):\t",e)
return false
let wrker = Worker(hostname, port)
w[i] = wrker
# redirect console output from workers to the client's stdout:
start_reading(stream,function(stream::AsyncStream,nread::Int)
if(nread>0)
try
line = readbytes(stream.buffer, nread)
print("\tFrom worker $(wrker.id):\t",line)
catch e
println("\tError parsing reply from worker $(wrker.id):\t",e)
return false
end
end
end
true
end)
true
end)
end
end
w
end
Expand Down Expand Up @@ -1613,13 +1615,12 @@ function event_loop(isclient)
multi_cb_handles.work_cb = SingleAsyncWork(globalEventLoop(),_jl_work_cb)
multi_cb_handles.fgcm = SingleAsyncWork(globalEventLoop(),(args...)->flush_gc_msgs());
timer = TimeoutAsyncWork(globalEventLoop(),(args...)->queueAsync(multi_cb_handles.work_cb))
startTimer(timer,int64(1),int64(10000)) #do work every 10s
startTimer(timer,int64(1),int64(1000)) #do work every 10s
iserr, lasterr = false, ()
while true
try
ccall(:jl_register_toplevel_eh, Void, ())
if iserr
println(typeof(lasterr))
show(lasterr)
iserr, lasterr = false, ()
else
Expand Down
43 changes: 0 additions & 43 deletions base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -243,49 +243,6 @@ force(x::Function) = x()
# we actually make objects. this allows for constructing objects that might
# interfere with I/O by reading, writing, blocking, etc.

type Deserializer <: Stream #TODO: rename to SyncStream
stream::AsyncStream
function Deserializer(loop::Function,stream::AsyncStream)
this = new(stream)
enq_work(() -> loop(this))
start_reading(stream)
this
end
end
show(io::IO,d::Deserializer) = print(io,"Deserializer()")

function read{T}(this::Deserializer, a::Array{T})
if isa(T, BitsKind)
nb = numel(a)*sizeof(T)
buf = this.stream.buffer
assert(buf.seekable == false)
assert(buf.maxsize >= nb)
wait_readnb(this.stream,nb)
read(this.stream.buffer, a)
return a
else
#error("Read from Buffer only supports bits types or arrays of bits types; got $T.")
error("Read from Buffer only supports bits types or arrays of bits types")
end
end

function read(this::Deserializer,::Type{Uint8})
buf = this.stream.buffer
assert(buf.seekable == false)
wait_readnb(this.stream,1)
read(buf,Uint8)
end

function readline(this::Deserializer)
buf = this.stream.buffer
assert(buf.seekable == false)
wait_readline(this.stream)
readline(buf)
end

write(::Deserializer,args...) = error("write not implemented for deserializer")
position(d::Deserializer) = d.pos

function deserialize(s)
b = int32(read(s, Uint8))
if b == 0
Expand Down
Loading

0 comments on commit ac786c8

Please sign in to comment.