Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel constructor requires an explicit size. Also implements 0-sized channels #18832

Merged
merged 1 commit into from
Oct 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 88 additions & 14 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,49 @@

abstract AbstractChannel

const DEF_CHANNEL_SZ=32

type Channel{T} <: AbstractChannel
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol

data::Array{T,1}
sz_max::Int # maximum size of channel
sz_max::Int # maximum size of channel

# Used when sz_max == 0, i.e., an unbuffered channel.
takers::Array{Condition}

function Channel(sz)
sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
new(Condition(), Condition(), :open, Array{T}(0), sz_max)
function Channel(sz::Float64)
if sz == Inf
Channel{T}(typemax(Int))
else
Channel{T}(convert(Int, sz))
end
end
function Channel(sz::Integer)
if sz < 0
throw(ArgumentError("Channel size must be either 0, a positive integer or Inf"))
end
new(Condition(), Condition(), :open, Array{T}(0), sz, Array{Condition}(0))
end

# deprecated empty constructor
function Channel()
depwarn(string("The empty constructor Channel() is deprecated. ",
Copy link
Contributor

@tkelman tkelman Oct 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't this in deprecated.jl ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Channel{T}() constructor needs to be inside the type definition, while Channel() which is equivalent of Channel{Any}() can be outside. Thus added both in channels.jl itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't it be an outer constructor? especially with two different signatures in different places that should be removed, those comments are going to be easy to miss

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How?

Channel() = Channel{Any}(32) is fine.
Channel{T}() = Channel{T}(32) is not callable.

julia> type Foo{T}
         foo::T
       end

julia> Foo{T}() = Foo{T}(1)
WARNING: static parameter T does not occur in signature for Type at REPL[2]:1.
The method will not be callable.
Foo{T}

"The channel size needs to be specified explictly. ",
"Defaulting to Channel{$T}(32)."), :Channel)
Channel(32)
end
end

Channel(sz::Int = DEF_CHANNEL_SZ) = Channel{Any}(sz)
Channel(sz) = Channel{Any}(sz)

# deprecated empty constructor
Channel() = Channel{Any}()

closed_exception() = InvalidStateException("Channel is closed.", :closed)

isbuffered(c::Channel) = c.sz_max==0 ? false : true

"""
close(c::Channel)

Expand All @@ -46,9 +69,16 @@ end
put!(c::Channel, v)

Appends an item `v` to the channel `c`. Blocks if the channel is full.

For unbuffered channels, blocks until a `take!` is performed by a different
task.
"""
function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
isbuffered(c) ? put_buffered(c,v) : put_unbuffered(c,v)
end

function put_buffered(c::Channel, v)
while length(c.data) == c.sz_max
wait(c.cond_put)
end
Expand All @@ -57,19 +87,42 @@ function put!(c::Channel, v)
v
end

function put_unbuffered(c::Channel, v)
while length(c.takers) == 0
notify(c.cond_take, nothing, true, false) # Required to handle wait() on 0-sized channels
wait(c.cond_put)
end
cond_taker = shift!(c.takers)
notify(cond_taker, v, false, false)
v
end

push!(c::Channel, v) = put!(c, v)

function fetch(c::Channel)
"""
fetch(c::Channel)

Waits for and gets the first available item from the channel. Does not
remove the item. `fetch` is unsupported on an unbuffered (0-size) channel.
"""
fetch(c::Channel) = isbuffered(c) ? fetch_buffered(c) : fetch_unbuffered(c)
function fetch_buffered(c::Channel)
wait(c)
c.data[1]
end
fetch_unbuffered(c::Channel) = throw(ErrorException("`fetch` is not supported on an unbuffered Channel."))


"""
take!(c::Channel)

Removes and returns a value from a `Channel`. Blocks till data is available.
Removes and returns a value from a `Channel`. Blocks until data is available.

For unbuffered channels, blocks until a `put!` is performed by a different
task.
"""
function take!(c::Channel)
take!(c::Channel) = isbuffered(c) ? take_buffered(c) : take_unbuffered(c)
function take_buffered(c::Channel)
wait(c)
v = shift!(c.data)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
Expand All @@ -78,13 +131,35 @@ end

shift!(c::Channel) = take!(c)

# 0-size channel
function take_unbuffered(c::Channel)
!isopen(c) && throw(closed_exception())
cond_taker = Condition()
push!(c.takers, cond_taker)
notify(c.cond_put, nothing, false, false)
try
return wait(cond_taker)
catch e
if isa(e, InterruptException)
# remove self from the list of takers
filter!(x -> x != cond_taker, c.takers)
else
rethrow(e)
end
end
end

"""
isready(c::Channel)

Determine whether a `Channel` has a value stored to it.
`isready` on `Channel`s is non-blocking.
Determine whether a `Channel` has a value stored to it. Returns
immediately, does not block.

For unbuffered channels returns `true` if there are tasks waiting
on a `put!`.
"""
isready(c::Channel) = n_avail(c) > 0
n_avail(c::Channel) = isbuffered(c) ? length(c.data) : n_waiters(c.cond_put)

function wait(c::Channel)
while !isready(c)
Expand All @@ -97,12 +172,11 @@ end
function notify_error(c::Channel, err)
notify_error(c.cond_take, err)
notify_error(c.cond_put, err)
foreach(x->notify_error(x, err), c.takers)
end

eltype{T}(::Type{Channel{T}}) = T

n_avail(c::Channel) = length(c.data)

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))")

start{T}(c::Channel{T}) = Ref{Nullable{T}}()
Expand Down
3 changes: 3 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1033,4 +1033,7 @@ end))
@deprecate_binding cycle Iterators.cycle
@deprecate_binding repeated Iterators.repeated

# NOTE: Deprecation of Channel{T}() is implemented in channels.jl.
# To be removed from there when 0.6 deprecations are removed.

# End deprecations scheduled for 0.6
11 changes: 7 additions & 4 deletions base/docs/helpdb/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1426,13 +1426,16 @@ endof
"""
Channel{T}(sz::Int)

Constructs a `Channel` that can hold a maximum of `sz` objects of type `T`. `put!` calls on
a full channel block till an object is removed with `take!`.
Constructs a `Channel` with an internal buffer that can hold a maximum of `sz` objects
of type `T`. `put!` calls on a full channel block until an object is removed with `take!`.

`Channel(0)` constructs an unbuffered channel. `put!` blocks until a matching `take!` is called.
And vice-versa.

Other constructors:

- `Channel()` - equivalent to `Channel{Any}(32)`
- `Channel(sz::Int)` equivalent to `Channel{Any}(sz)`
- `Channel(Inf)` - equivalent to `Channel{Any}(typemax(Int))`
- `Channel(sz)` equivalent to `Channel{Any}(sz)`
"""
Channel

Expand Down
1 change: 1 addition & 0 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ end

notify_error(c::Condition, err) = notify(c, err, true, true)

n_waiters(c::Condition) = length(c.waitq)

# schedule an expression to run asynchronously, with minimal ceremony
"""
Expand Down
1 change: 0 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,6 @@ Waits and fetches a value from `x` depending on the type of `x`. Does not remove
is an exception, throws a `RemoteException` which captures the remote exception and backtrace.
* `RemoteChannel`: Wait for and get the value of a remote reference. Exceptions raised are
same as for a `Future` .
* `Channel` : Wait for and get the first available item from the channel.
"""
fetch(x::ANY) = x

Expand Down
75 changes: 44 additions & 31 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,53 @@ Tasks

.. Docstring generated from Julia source

Constructs a ``Channel`` that can hold a maximum of ``sz`` objects of type ``T``\ . ``put!`` calls on a full channel block till an object is removed with ``take!``\ .
Constructs a ``Channel`` with an internal buffer that can hold a maximum of ``sz`` objects of type ``T``\ . ``put!`` calls on a full channel block until an object is removed with ``take!``\ .

``Channel(0)`` constructs an unbuffered channel. ``put!`` blocks until a matching ``take!`` is called. And vice-versa.

Other constructors:

* ``Channel()`` - equivalent to ``Channel{Any}(32)``
* ``Channel(sz::Int)`` equivalent to ``Channel{Any}(sz)``
* ``Channel(Inf)`` - equivalent to ``Channel{Any}(typemax(Int))``
* ``Channel(sz)`` equivalent to ``Channel{Any}(sz)``

.. function:: put!(c::Channel, v)

.. Docstring generated from Julia source

Appends an item ``v`` to the channel ``c``\ . Blocks if the channel is full.

For unbuffered channels, blocks until a ``take!`` is performed by a different task.

.. function:: take!(c::Channel)

.. Docstring generated from Julia source

Removes and returns a value from a ``Channel``\ . Blocks until data is available.

For unbuffered channels, blocks until a ``put!`` is performed by a different task.

.. function:: isready(c::Channel)

.. Docstring generated from Julia source

Determine whether a ``Channel`` has a value stored to it. Returns immediately, does not block.

For unbuffered channels returns ``true`` if there are tasks waiting on a ``put!``\ .

.. function:: fetch(c::Channel)

.. Docstring generated from Julia source

Waits for and gets the first available item from the channel. Does not remove the item. ``fetch`` is unsupported on an unbuffered (0-size) channel.

.. function:: close(c::Channel)

.. Docstring generated from Julia source

Closes a channel. An exception is thrown by:

* ``put!`` on a closed channel.
* ``take!`` and ``fetch`` on an empty, closed channel.

General Parallel Computing Support
----------------------------------
Expand Down Expand Up @@ -336,7 +377,6 @@ General Parallel Computing Support

* ``Future``\ : Wait for and get the value of a Future. The fetched value is cached locally. Further calls to ``fetch`` on the same reference return the cached value. If the remote value is an exception, throws a ``RemoteException`` which captures the remote exception and backtrace.
* ``RemoteChannel``\ : Wait for and get the value of a remote reference. Exceptions raised are same as for a ``Future`` .
* ``Channel`` : Wait for and get the first available item from the channel.

.. function:: remotecall_wait(f, id::Integer, args...; kwargs...)

Expand All @@ -362,30 +402,12 @@ General Parallel Computing Support

Store a value to a ``Future`` ``rr``\ . ``Future``\ s are write-once remote references. A ``put!`` on an already set ``Future`` throws an ``Exception``\ . All asynchronous remote calls return ``Future``\ s and set the value to the return value of the call upon completion.

.. function:: put!(c::Channel, v)

.. Docstring generated from Julia source

Appends an item ``v`` to the channel ``c``\ . Blocks if the channel is full.

.. function:: take!(rr::RemoteChannel, args...)

.. Docstring generated from Julia source

Fetch value(s) from a remote channel, removing the value(s) in the processs.

.. function:: take!(c::Channel)

.. Docstring generated from Julia source

Removes and returns a value from a ``Channel``\ . Blocks till data is available.

.. function:: isready(c::Channel)

.. Docstring generated from Julia source

Determine whether a ``Channel`` has a value stored to it. ``isready`` on ``Channel``\ s is non-blocking.

.. function:: isready(rr::RemoteChannel, args...)

.. Docstring generated from Julia source
Expand All @@ -406,15 +428,6 @@ General Parallel Computing Support
@async put!(c, remotecall_fetch(long_computation, p))
isready(c) # will not block

.. function:: close(c::Channel)

.. Docstring generated from Julia source

Closes a channel. An exception is thrown by:

* ``put!`` on a closed channel.
* ``take!`` and ``fetch`` on an empty, closed channel.

.. function:: WorkerPool(workers)

.. Docstring generated from Julia source
Expand Down
Loading