diff --git a/base/stream.jl b/base/stream.jl index d7ff4c8cc0b7f..4597fa1dd5059 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -44,13 +44,16 @@ abstract type LibuvStream <: IO end bytesavailable(s::LibuvStream) = bytesavailable(s.buffer) function eof(s::LibuvStream) - if isopen(s) # fast path - bytesavailable(s) <= 0 || return false - else - return bytesavailable(s) <= 0 - end + bytesavailable(s) > 0 && return false wait_readnb(s, 1) - return !isopen(s) && bytesavailable(s) <= 0 + # This function is race-y if used from multiple threads, but we guarantee + # it to never return false until the stream is definitively exhausted + # and that we won't return true if there's a readerror pending (it'll instead get thrown). + # This requires some careful ordering here (TODO: atomic loads) + bytesavailable(s) > 0 && return false + open = isopen(s) # must preceed readerror check + s.readerror === nothing || throw(s.readerror) + return !open end # Limit our default maximum read and buffer size, @@ -327,17 +330,25 @@ function check_open(x::Union{LibuvStream, LibuvServer}) end function wait_readnb(x::LibuvStream, nb::Int) + # fast path before iolock acquire + bytesavailable(x.buffer) >= nb && return + open = isopen(x) # must preceed readerror check + x.readerror === nothing || throw(x.readerror) + open || return iolock_begin() - if !isopen(x) || bytesavailable(x.buffer) >= nb # fast path - iolock_end() - return - end + # repeat fast path after iolock acquire, before other expensive work + bytesavailable(x.buffer) >= nb && (iolock_end(); return) + open = isopen(x) + x.readerror === nothing || throw(x.readerror) + open || (iolock_end(); return) + # now do the "real" work oldthrottle = x.throttle preserve_handle(x) lock(x.cond) try - while isopen(x) && bytesavailable(x.buffer) < nb + while bytesavailable(x.buffer) < nb x.readerror === nothing || throw(x.readerror) + isopen(x) || break x.throttle = max(nb, x.throttle) start_reading(x) # ensure we are reading iolock_end() @@ -351,6 +362,8 @@ function wait_readnb(x::LibuvStream, nb::Int) stop_reading(x) # stop reading iff there are currently no other read clients of the stream end if oldthrottle <= x.throttle <= nb + # if we're interleaving readers, we might not get back to the "original" throttle + # but we consider that an acceptable "risk", since we can't be quite sure what the intended value is now x.throttle = oldthrottle end unpreserve_handle(x) @@ -543,7 +556,6 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) # remind the client that stream.buffer is full notify(stream.cond) elseif nread == UV_EOF - stream.readerror = EOFError() if isa(stream, TTY) stream.status = StatusEOF # libuv called uv_stop_reading already notify(stream.cond) @@ -589,7 +601,6 @@ function reseteof(x::TTY) iolock_begin() if x.status == StatusEOF x.status = StatusOpen - x.readerror isa EOFError && (x.readerror = nothing) end iolock_end() nothing @@ -772,40 +783,33 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int) @assert sbuf.seekable == false @assert sbuf.maxsize >= nb - local nread - if bytesavailable(sbuf) >= nb - nread = readbytes!(sbuf, a, nb) - iolock_end() - return nread - end - - if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer - while isopen(s) && bytesavailable(sbuf) < nb + function wait_locked(s, buf, nb) + while bytesavailable(buf) < nb + s.readerror === nothing || throw(s.readerror) + isopen(s) || break iolock_end() wait_readnb(s, nb) iolock_begin() end - nread = readbytes!(sbuf, a, nb) - iolock_end() - return nread end - nread = try - stop_reading(s) # Just playing it safe, since we are going to switch buffers. - newbuf = PipeBuffer(a, maxsize = nb) + if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer + wait_locked(s, sbuf, nb) + end + if bytesavailable(sbuf) >= nb + nread = readbytes!(sbuf, a, nb) + else + newbuf = PipeBuffer(a, maxsize=nb) newbuf.size = 0 # reset the write pointer to the beginning - s.buffer = newbuf - write(newbuf, sbuf) - iolock_end() - wait_readnb(s, Int(nb)) - iolock_begin() - compact(newbuf) - bytesavailable(newbuf) - finally - s.buffer = sbuf - if !isempty(s.cond) - start_reading(s) # resume reading iff there are currently other read clients of the stream + nread = try + s.buffer = newbuf + write(newbuf, sbuf) + wait_locked(s, newbuf, nb) + bytesavailable(newbuf) + finally + s.buffer = sbuf end + compact(newbuf) end iolock_end() return nread @@ -825,31 +829,30 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) @assert sbuf.seekable == false @assert sbuf.maxsize >= nb - if bytesavailable(sbuf) >= nb - unsafe_read(sbuf, p, nb) - elseif nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer - while isopen(s) && bytesavailable(sbuf) < nb + function wait_locked(s, buf, nb) + while bytesavailable(buf) < nb + s.readerror === nothing || throw(s.readerror) + isopen(s) || throw(EOFError()) iolock_end() - wait_readnb(s, Int(nb)) + wait_readnb(s, nb) iolock_begin() end + end + + if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer + wait_locked(s, sbuf, Int(nb)) + end + if bytesavailable(sbuf) >= nb unsafe_read(sbuf, p, nb) else + newbuf = PipeBuffer(unsafe_wrap(Array, p, nb), maxsize=Int(nb)) + newbuf.size = 0 # reset the write pointer to the beginning try - stop_reading(s) # Just playing it safe, since we are going to switch buffers. - newbuf = PipeBuffer(unsafe_wrap(Array, p, nb), maxsize = Int(nb)) - newbuf.size = 0 # reset the write pointer to the beginning s.buffer = newbuf write(newbuf, sbuf) - iolock_end() - wait_readnb(s, Int(nb)) - iolock_begin() - nb == bytesavailable(newbuf) || throw(EOFError()) + wait_locked(s, newbuf, Int(nb)) finally s.buffer = sbuf - if !isempty(s.cond) - start_reading(s) # resume reading iff there are currently other read clients of the stream - end end end iolock_end() @@ -860,9 +863,9 @@ function read(this::LibuvStream, ::Type{UInt8}) iolock_begin() sbuf = this.buffer @assert sbuf.seekable == false - while isopen(this) && bytesavailable(sbuf) < 1 + while bytesavailable(sbuf) < 1 iolock_end() - wait_readnb(this, 1) + eof(this) && throw(EOFError()) iolock_begin() end c = read(sbuf, UInt8) @@ -871,7 +874,7 @@ function read(this::LibuvStream, ::Type{UInt8}) end function readavailable(this::LibuvStream) - wait_readnb(this, 1) + wait_readnb(this, 1) # unlike the other `read` family of functions, this one doesn't guarantee error reporting iolock_begin() buf = this.buffer @assert buf.seekable == false @@ -884,25 +887,29 @@ function readuntil(x::LibuvStream, c::UInt8; keep::Bool=false) iolock_begin() buf = x.buffer @assert buf.seekable == false - if isopen(x) && !occursin(c, buf) # fast path - preserve_handle(x) - lock(x.cond) - try - while isopen(x) && !occursin(c, x.buffer) - x.readerror === nothing || throw(x.readerror) - start_reading(x) # ensure we are reading - iolock_end() - wait(x.cond) + if !occursin(c, buf) # fast path checks first + x.readerror === nothing || throw(x.readerror) + if isopen(x) + preserve_handle(x) + lock(x.cond) + try + while !occursin(c, x.buffer) + x.readerror === nothing || throw(x.readerror) + isopen(x) || break + start_reading(x) # ensure we are reading + iolock_end() + wait(x.cond) + unlock(x.cond) + iolock_begin() + lock(x.cond) + end + finally + if isempty(x.cond) + stop_reading(x) # stop reading iff there are currently no other read clients of the stream + end unlock(x.cond) - iolock_begin() - lock(x.cond) + unpreserve_handle(x) end - finally - if isempty(x.cond) - stop_reading(x) # stop reading iff there are currently no other read clients of the stream - end - unlock(x.cond) - unpreserve_handle(x) end end bytes = readuntil(buf, c, keep=keep) diff --git a/stdlib/REPL/test/repl.jl b/stdlib/REPL/test/repl.jl index 1d2a29d3057c5..6bf697cda7dc1 100644 --- a/stdlib/REPL/test/repl.jl +++ b/stdlib/REPL/test/repl.jl @@ -750,11 +750,20 @@ let exename = Base.julia_cmd() @test output == "1\r\nexit()\r\n1\r\n\r\njulia> " end @test bytesavailable(pty_master) == 0 - @test try # possibly consume child-exited notification - eof(pty_master) - catch ex - (ex isa Base.IOError && ex.code == Base.UV_EIO) || rethrow() + @test if Sys.iswindows() || Sys.isbsd() eof(pty_master) + else + # Some platforms (such as linux) report EIO instead of EOF + # possibly consume child-exited notification + # for example, see discussion in https://bugs.python.org/issue5380 + try + eof(pty_master) && !Sys.islinux() + catch ex + (ex isa Base.IOError && ex.code == Base.UV_EIO) || rethrow() + @test_throws ex eof(pty_master) # make sure the error is sticky + pty_master.readerror = nothing + eof(pty_master) + end end @test read(pty_master, String) == "" wait(p)