Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stream shutdown and support half-duplex operation #40783

Merged
merged 1 commit into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Standard library changes
overflow in most cases. The new function `checked_length` is now available, which will try to use checked
arithmetic to error if the result may be wrapping. Or use a package such as SaferIntegers.jl when
constructing the range. ([#40382])
* TCP socket objects now expose `shutdown` functionality and support half-open mode usage ([#40783]).

#### InteractiveUtils
* A new macro `@time_imports` for reporting any time spent importing packages and their dependencies ([#41612])
Expand Down
1 change: 1 addition & 0 deletions base/coreio.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ write(::DevNull, ::UInt8) = 1
unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n
close(::DevNull) = nothing
wait_close(::DevNull) = wait()
bytesavailable(io::DevNull) = 0

let CoreIO = Union{Core.CoreSTDOUT, Core.CoreSTDERR}
global write(io::CoreIO, x::UInt8) = Core.write(io, x)
Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ export

# I/O and events
close,
shutdown,
countlines,
eachline,
readeach,
Expand Down
124 changes: 72 additions & 52 deletions base/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,49 @@ function isopen end
Close an I/O stream. Performs a [`flush`](@ref) first.
"""
function close end

"""
shutdown(stream)

Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref)
first. Notify the other end that no more data will be written to the underlying
file. This is not supported by all IO types.

# Examples
```jldoctest
julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task

julia> write(io, "request");

julia> # calling `read(io)` here would block forever

julia> shutdown(io);

julia> read(io, String)
"request"
"""
vtjnash marked this conversation as resolved.
Show resolved Hide resolved
function shutdown end

"""
flush(stream)

Commit all currently buffered writes to the given stream.
"""
function flush end
function wait_readnb end
function wait_close end

"""
bytesavailable(io)

Return the number of bytes available for reading before a read from this stream or buffer will block.

# Examples
```jldoctest
julia> io = IOBuffer("JuliaLang is a GitHub organization");

julia> bytesavailable(io)
34
```
"""
function bytesavailable end

"""
Expand All @@ -81,7 +121,7 @@ function readavailable end
"""
isreadable(io) -> Bool

Return `true` if the specified IO object is readable (if that can be determined).
Return `false` if the specified IO object is not readable.

# Examples
```jldoctest
Expand All @@ -99,12 +139,12 @@ true
julia> rm("myfile.txt")
```
"""
function isreadable end
isreadable(io::IO) = isopen(io)

"""
iswritable(io) -> Bool

Return `true` if the specified IO object is writable (if that can be determined).
Return `false` if the specified IO object is not writable.

# Examples
```jldoctest
Expand All @@ -122,10 +162,23 @@ false
julia> rm("myfile.txt")
```
"""
function iswritable end
function copy end
iswritable(io::IO) = isopen(io)

"""
eof(stream) -> Bool

Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this
function will block to wait for more data if necessary, and then return `false`. Therefore
it is always safe to read one byte after seeing `eof` return `false`. `eof` will return
`false` as long as buffered data is still available, even if the remote end of a connection
is closed.
"""
function eof end

function copy end
function wait_readnb end
function wait_close end

"""
read(io::IO, T)

Expand Down Expand Up @@ -357,65 +410,37 @@ end
function pipe_reader end
function pipe_writer end

for f in (:flush, :shutdown, :iswritable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_writer(io)::IO)
end
write(io::AbstractPipe, byte::UInt8) = write(pipe_writer(io)::IO, byte)
write(to::IO, from::AbstractPipe) = write(to, pipe_reader(from))
unsafe_write(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_write(pipe_writer(io)::IO, p, nb)::Union{Int,UInt}
buffer_writes(io::AbstractPipe, args...) = buffer_writes(pipe_writer(io)::IO, args...)
flush(io::AbstractPipe) = flush(pipe_writer(io)::IO)

for f in (
# peek/mark interface
:mark, :unmark, :reset, :ismarked,
# Simple reader functions
:read, :readavailable, :bytesavailable, :reseteof, :isreadable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO)
end
read(io::AbstractPipe, byte::Type{UInt8}) = read(pipe_reader(io)::IO, byte)::UInt8
unsafe_read(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_read(pipe_reader(io)::IO, p, nb)
read(io::AbstractPipe) = read(pipe_reader(io)::IO)
readuntil(io::AbstractPipe, arg::UInt8; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractChar; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractString; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractVector; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil_vector!(io::AbstractPipe, target::AbstractVector, keep::Bool, out) = readuntil_vector!(pipe_reader(io)::IO, target, keep, out)
readbytes!(io::AbstractPipe, target::AbstractVector{UInt8}, n=length(target)) = readbytes!(pipe_reader(io)::IO, target, n)

for f in (
# peek/mark interface
:mark, :unmark, :reset, :ismarked,
# Simple reader functions
:readavailable, :isreadable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO)
end
peek(io::AbstractPipe, ::Type{T}) where {T} = peek(pipe_reader(io)::IO, T)::T
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb)
eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool

iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)::IO)
isopen(io::AbstractPipe) = isopen(pipe_writer(io)::IO) || isopen(pipe_reader(io)::IO)
close(io::AbstractPipe) = (close(pipe_writer(io)::IO); close(pipe_reader(io)::IO))
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb)
wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)::IO); wait_close(pipe_reader(io)::IO))

"""
bytesavailable(io)

Return the number of bytes available for reading before a read from this stream or buffer will block.

# Examples
```jldoctest
julia> io = IOBuffer("JuliaLang is a GitHub organization");

julia> bytesavailable(io)
34
```
"""
bytesavailable(io::AbstractPipe) = bytesavailable(pipe_reader(io)::IO)
bytesavailable(io::DevNull) = 0

"""
eof(stream) -> Bool

Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this
function will block to wait for more data if necessary, and then return `false`. Therefore
it is always safe to read one byte after seeing `eof` return `false`. `eof` will return
`false` as long as buffered data is still available, even if the remote end of a connection
is closed.
"""
eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool
reseteof(io::AbstractPipe) = reseteof(pipe_reader(io)::IO)


# Exception-safe wrappers (io = open(); try f(io) finally close(io))

Expand Down Expand Up @@ -1119,11 +1144,6 @@ ismarked(io::IO) = io.mark >= 0
# Make sure all IO streams support flush, even if only as a no-op,
# to make it easier to write generic I/O code.

"""
flush(stream)

Commit all currently buffered writes to the given stream.
"""
flush(io::IO) = nothing

"""
Expand Down
6 changes: 6 additions & 0 deletions base/iobuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ end

eof(io::GenericIOBuffer) = (io.ptr-1 == io.size)

function shutdown(io::GenericIOBuffer)
io.writable = false
# OR throw(_UVError("shutdown", UV_ENOTSOCK))
nothing
end

@noinline function close(io::GenericIOBuffer{T}) where T
io.readable = false
io.writable = false
Expand Down
1 change: 1 addition & 0 deletions base/libuv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ end
function uv_alloc_buf end
function uv_readcb end
function uv_writecb_task end
function uv_shutdowncb_task end
function uv_return_spawn end
function uv_asynccb end
function uv_timercb end
Expand Down
1 change: 1 addition & 0 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ function setup_stdio(stdio::Union{IOBuffer, BufferStream}, child_readable::Bool)
@warn "Process error" exception=(ex, catch_backtrace())
finally
close(parent)
child_readable || shutdown(stdio)
end
end
catch ex
Expand Down
Loading