From e23f4e2ebbf2770bec5846339089c798b4eab2af Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Fri, 7 Oct 2016 14:42:04 +0530 Subject: [PATCH] Channel constructor requires an explict size. Move channel tests into its own file. Implement 0-sized channels. --- base/channels.jl | 102 +++++++++++++++++++++++++++++++++------ base/deprecated.jl | 3 ++ base/docs/helpdb/Base.jl | 11 +++-- base/event.jl | 1 + base/multi.jl | 1 - doc/stdlib/parallel.rst | 75 ++++++++++++++++------------ test/channels.jl | 90 ++++++++++++++++++++++++++++++++++ test/choosetests.jl | 3 +- test/parallel_exec.jl | 62 +----------------------- 9 files changed, 236 insertions(+), 112 deletions(-) create mode 100644 test/channels.jl diff --git a/base/channels.jl b/base/channels.jl index cf932d2c55bac..2648df4403eb5 100644 --- a/base/channels.jl +++ b/base/channels.jl @@ -2,26 +2,49 @@ abstract AbstractChannel -const DEF_CHANNEL_SZ=32 - type Channel{T} <: AbstractChannel cond_take::Condition # waiting for data to become available cond_put::Condition # waiting for a writeable slot state::Symbol data::Array{T,1} - sz_max::Int # maximum size of channel + sz_max::Int # maximum size of channel + + # Used when sz_max == 0, i.e., an unbuffered channel. + takers::Array{Condition} - function Channel(sz) - sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz - new(Condition(), Condition(), :open, Array{T}(0), sz_max) + function Channel(sz::Float64) + if sz == Inf + Channel{T}(typemax(Int)) + else + Channel{T}(convert(Int, sz)) + end + end + function Channel(sz::Integer) + if sz < 0 + throw(ArgumentError("Channel size must be either 0, a positive integer or Inf")) + end + new(Condition(), Condition(), :open, Array{T}(0), sz, Array{Condition}(0)) + end + + # deprecated empty constructor + function Channel() + depwarn(string("The empty constructor Channel() is deprecated. ", + "The channel size needs to be specified explictly. ", + "Defaulting to Channel{$T}(32)."), :Channel) + Channel(32) end end -Channel(sz::Int = DEF_CHANNEL_SZ) = Channel{Any}(sz) +Channel(sz) = Channel{Any}(sz) + +# deprecated empty constructor +Channel() = Channel{Any}() closed_exception() = InvalidStateException("Channel is closed.", :closed) +isbuffered(c::Channel) = c.sz_max==0 ? false : true + """ close(c::Channel) @@ -46,9 +69,16 @@ end put!(c::Channel, v) Appends an item `v` to the channel `c`. Blocks if the channel is full. + +For unbuffered channels, blocks until a `take!` is performed by a different +task. """ function put!(c::Channel, v) !isopen(c) && throw(closed_exception()) + isbuffered(c) ? put_buffered(c,v) : put_unbuffered(c,v) +end + +function put_buffered(c::Channel, v) while length(c.data) == c.sz_max wait(c.cond_put) end @@ -57,19 +87,42 @@ function put!(c::Channel, v) v end +function put_unbuffered(c::Channel, v) + while length(c.takers) == 0 + notify(c.cond_take, nothing, true, false) # Required to handle wait() on 0-sized channels + wait(c.cond_put) + end + cond_taker = shift!(c.takers) + notify(cond_taker, v, false, false) + v +end + push!(c::Channel, v) = put!(c, v) -function fetch(c::Channel) +""" + fetch(c::Channel) + +Waits for and gets the first available item from the channel. Does not +remove the item. `fetch` is unsupported on an unbuffered (0-size) channel. +""" +fetch(c::Channel) = isbuffered(c) ? fetch_buffered(c) : fetch_unbuffered(c) +function fetch_buffered(c::Channel) wait(c) c.data[1] end +fetch_unbuffered(c::Channel) = throw(ErrorException("`fetch` is not supported on an unbuffered Channel.")) + """ take!(c::Channel) -Removes and returns a value from a `Channel`. Blocks till data is available. +Removes and returns a value from a `Channel`. Blocks until data is available. + +For unbuffered channels, blocks until a `put!` is performed by a different +task. """ -function take!(c::Channel) +take!(c::Channel) = isbuffered(c) ? take_buffered(c) : take_unbuffered(c) +function take_buffered(c::Channel) wait(c) v = shift!(c.data) notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!. @@ -78,13 +131,35 @@ end shift!(c::Channel) = take!(c) +# 0-size channel +function take_unbuffered(c::Channel) + !isopen(c) && throw(closed_exception()) + cond_taker = Condition() + push!(c.takers, cond_taker) + notify(c.cond_put, nothing, false, false) + try + return wait(cond_taker) + catch e + if isa(e, InterruptException) + # remove self from the list of takers + filter!(x -> x != cond_taker, c.takers) + else + rethrow(e) + end + end +end + """ isready(c::Channel) -Determine whether a `Channel` has a value stored to it. -`isready` on `Channel`s is non-blocking. +Determine whether a `Channel` has a value stored to it. Returns +immediately, does not block. + +For unbuffered channels returns `true` if there are tasks waiting +on a `put!`. """ isready(c::Channel) = n_avail(c) > 0 +n_avail(c::Channel) = isbuffered(c) ? length(c.data) : n_waiters(c.cond_put) function wait(c::Channel) while !isready(c) @@ -97,12 +172,11 @@ end function notify_error(c::Channel, err) notify_error(c.cond_take, err) notify_error(c.cond_put, err) + foreach(x->notify_error(x, err), c.takers) end eltype{T}(::Type{Channel{T}}) = T -n_avail(c::Channel) = length(c.data) - show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))") start{T}(c::Channel{T}) = Ref{Nullable{T}}() diff --git a/base/deprecated.jl b/base/deprecated.jl index a22b7add42146..41c2bcc038981 100644 --- a/base/deprecated.jl +++ b/base/deprecated.jl @@ -1033,4 +1033,7 @@ end)) @deprecate_binding cycle Iterators.cycle @deprecate_binding repeated Iterators.repeated +# NOTE: Deprecation of Channel{T}() is implemented in channels.jl. +# To be removed from there when 0.6 deprecations are removed. + # End deprecations scheduled for 0.6 diff --git a/base/docs/helpdb/Base.jl b/base/docs/helpdb/Base.jl index bb6dc586cb41a..0e370afc6af63 100644 --- a/base/docs/helpdb/Base.jl +++ b/base/docs/helpdb/Base.jl @@ -1426,13 +1426,16 @@ endof """ Channel{T}(sz::Int) -Constructs a `Channel` that can hold a maximum of `sz` objects of type `T`. `put!` calls on -a full channel block till an object is removed with `take!`. +Constructs a `Channel` with an internal buffer that can hold a maximum of `sz` objects +of type `T`. `put!` calls on a full channel block until an object is removed with `take!`. + +`Channel(0)` constructs an unbuffered channel. `put!` blocks until a matching `take!` is called. +And vice-versa. Other constructors: -- `Channel()` - equivalent to `Channel{Any}(32)` -- `Channel(sz::Int)` equivalent to `Channel{Any}(sz)` +- `Channel(Inf)` - equivalent to `Channel{Any}(typemax(Int))` +- `Channel(sz)` equivalent to `Channel{Any}(sz)` """ Channel diff --git a/base/event.jl b/base/event.jl index 3970db83171b3..2d99c13ac20dd 100644 --- a/base/event.jl +++ b/base/event.jl @@ -54,6 +54,7 @@ end notify_error(c::Condition, err) = notify(c, err, true, true) +n_waiters(c::Condition) = length(c.waitq) # schedule an expression to run asynchronously, with minimal ceremony """ diff --git a/base/multi.jl b/base/multi.jl index aac88963e75c3..95e50679b7725 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -1175,7 +1175,6 @@ Waits and fetches a value from `x` depending on the type of `x`. Does not remove is an exception, throws a `RemoteException` which captures the remote exception and backtrace. * `RemoteChannel`: Wait for and get the value of a remote reference. Exceptions raised are same as for a `Future` . -* `Channel` : Wait for and get the first available item from the channel. """ fetch(x::ANY) = x diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index 33dfe5bcddae1..66c8aa0737f47 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -115,12 +115,53 @@ Tasks .. Docstring generated from Julia source - Constructs a ``Channel`` that can hold a maximum of ``sz`` objects of type ``T``\ . ``put!`` calls on a full channel block till an object is removed with ``take!``\ . + Constructs a ``Channel`` with an internal buffer that can hold a maximum of ``sz`` objects of type ``T``\ . ``put!`` calls on a full channel block until an object is removed with ``take!``\ . + + ``Channel(0)`` constructs an unbuffered channel. ``put!`` blocks until a matching ``take!`` is called. And vice-versa. Other constructors: - * ``Channel()`` - equivalent to ``Channel{Any}(32)`` - * ``Channel(sz::Int)`` equivalent to ``Channel{Any}(sz)`` + * ``Channel(Inf)`` - equivalent to ``Channel{Any}(typemax(Int))`` + * ``Channel(sz)`` equivalent to ``Channel{Any}(sz)`` + +.. function:: put!(c::Channel, v) + + .. Docstring generated from Julia source + + Appends an item ``v`` to the channel ``c``\ . Blocks if the channel is full. + + For unbuffered channels, blocks until a ``take!`` is performed by a different task. + +.. function:: take!(c::Channel) + + .. Docstring generated from Julia source + + Removes and returns a value from a ``Channel``\ . Blocks until data is available. + + For unbuffered channels, blocks until a ``put!`` is performed by a different task. + +.. function:: isready(c::Channel) + + .. Docstring generated from Julia source + + Determine whether a ``Channel`` has a value stored to it. Returns immediately, does not block. + + For unbuffered channels returns ``true`` if there are tasks waiting on a ``put!``\ . + +.. function:: fetch(c::Channel) + + .. Docstring generated from Julia source + + Waits for and gets the first available item from the channel. Does not remove the item. ``fetch`` is unsupported on an unbuffered (0-size) channel. + +.. function:: close(c::Channel) + + .. Docstring generated from Julia source + + Closes a channel. An exception is thrown by: + + * ``put!`` on a closed channel. + * ``take!`` and ``fetch`` on an empty, closed channel. General Parallel Computing Support ---------------------------------- @@ -336,7 +377,6 @@ General Parallel Computing Support * ``Future``\ : Wait for and get the value of a Future. The fetched value is cached locally. Further calls to ``fetch`` on the same reference return the cached value. If the remote value is an exception, throws a ``RemoteException`` which captures the remote exception and backtrace. * ``RemoteChannel``\ : Wait for and get the value of a remote reference. Exceptions raised are same as for a ``Future`` . - * ``Channel`` : Wait for and get the first available item from the channel. .. function:: remotecall_wait(f, id::Integer, args...; kwargs...) @@ -362,30 +402,12 @@ General Parallel Computing Support Store a value to a ``Future`` ``rr``\ . ``Future``\ s are write-once remote references. A ``put!`` on an already set ``Future`` throws an ``Exception``\ . All asynchronous remote calls return ``Future``\ s and set the value to the return value of the call upon completion. -.. function:: put!(c::Channel, v) - - .. Docstring generated from Julia source - - Appends an item ``v`` to the channel ``c``\ . Blocks if the channel is full. - .. function:: take!(rr::RemoteChannel, args...) .. Docstring generated from Julia source Fetch value(s) from a remote channel, removing the value(s) in the processs. -.. function:: take!(c::Channel) - - .. Docstring generated from Julia source - - Removes and returns a value from a ``Channel``\ . Blocks till data is available. - -.. function:: isready(c::Channel) - - .. Docstring generated from Julia source - - Determine whether a ``Channel`` has a value stored to it. ``isready`` on ``Channel``\ s is non-blocking. - .. function:: isready(rr::RemoteChannel, args...) .. Docstring generated from Julia source @@ -406,15 +428,6 @@ General Parallel Computing Support @async put!(c, remotecall_fetch(long_computation, p)) isready(c) # will not block -.. function:: close(c::Channel) - - .. Docstring generated from Julia source - - Closes a channel. An exception is thrown by: - - * ``put!`` on a closed channel. - * ``take!`` and ``fetch`` on an empty, closed channel. - .. function:: WorkerPool(workers) .. Docstring generated from Julia source diff --git a/test/channels.jl b/test/channels.jl new file mode 100644 index 0000000000000..620b433b2795c --- /dev/null +++ b/test/channels.jl @@ -0,0 +1,90 @@ +# This file is a part of Julia. License is MIT: http://julialang.org/license + +# Test various constructors +c=Channel(1) +@test eltype(c) == Any +@test put!(c, 1) == 1 +@test isready(c) == true +@test take!(c) == 1 +@test isready(c) == false + +@test eltype(Channel(1.0)) == Any + +c=Channel{Int}(1) +@test eltype(c) == Int +@test_throws MethodError put!(c, "Hello") + +c=Channel{Int}(Inf) +@test eltype(c) == Int +pvals = map(i->put!(c,i), 1:10^6) +tvals = Int[take!(c) for i in 1:10^6] +@test pvals == tvals + +# Uncomment line below once deprecation support has been removed. +# @test_throws MethodError Channel() + +@test_throws ArgumentError Channel(-1) +@test_throws InexactError Channel(1.5) + +# Test multiple concurrent put!/take! on a channel for different sizes +function testcpt(sz) + c = Channel{Int}(sz) + size = 0 + inc() = size += 1 + dec() = size -= 1 + @sync for i = 1:10^4 + @async (sleep(rand()); put!(c, i); inc()) + @async (sleep(rand()); take!(c); dec()) + end + @test size == 0 +end +testcpt(0) +testcpt(1) +testcpt(32) +testcpt(Inf) + +# Test multiple "for" loops waiting on the same channel which +# is closed after adding a few elements. +c=Channel(32) +results=[] +@sync begin + for i in 1:20 + @async for i in c + push!(results, i) + end + end + sleep(1.0) + for i in 1:5 + put!(c,i) + end + close(c) +end +@test sum(results) == 15 + +# Testing timedwait on multiple channels +@sync begin + rr1 = Channel(1) + rr2 = Channel(1) + rr3 = Channel(1) + + callback() = all(map(isready, [rr1, rr2, rr3])) + # precompile functions which will be tested for execution time + @test !callback() + @test timedwait(callback, 0.0) === :timed_out + + @async begin sleep(0.5); put!(rr1, :ok) end + @async begin sleep(1.0); put!(rr2, :ok) end + @async begin sleep(2.0); put!(rr3, :ok) end + + tic() + timedwait(callback, 1.0) + et=toq() + # assuming that 0.5 seconds is a good enough buffer on a typical modern CPU + try + @test (et >= 1.0) && (et <= 1.5) + @test !isready(rr3) + catch + warn("timedwait tests delayed. et=$et, isready(rr3)=$(isready(rr3))") + end + @test isready(rr1) +end diff --git a/test/choosetests.jl b/test/choosetests.jl index ffe4a1d96eb57..5a377a54d1371 100644 --- a/test/choosetests.jl +++ b/test/choosetests.jl @@ -33,7 +33,8 @@ function choosetests(choices = []) "markdown", "base64", "serialize", "misc", "threads", "enums", "cmdlineargs", "i18n", "workspace", "libdl", "int", "checked", "intset", "floatfuncs", "compile", "parallel", "inline", - "boundscheck", "error", "ambiguous", "cartesian", "asmvariant" + "boundscheck", "error", "ambiguous", "cartesian", "asmvariant", + "channels" ] profile_skipped = false if startswith(string(Sys.ARCH), "arm") diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index b646a1a4270aa..fa3100c5fe480 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -512,66 +512,6 @@ workloads = Int[sum(ids .== i) for i in 2:nprocs()] # @parallel reduction should work even with very short ranges @test @parallel(+, for i=1:2; i; end) == 3 -# Testing timedwait on multiple channels -@sync begin - rr1 = Channel() - rr2 = Channel() - rr3 = Channel() - - callback() = all(map(isready, [rr1, rr2, rr3])) - # precompile functions which will be tested for execution time - @test !callback() - @test timedwait(callback, 0.0) === :timed_out - - @async begin sleep(0.5); put!(rr1, :ok) end - @async begin sleep(1.0); put!(rr2, :ok) end - @async begin sleep(2.0); put!(rr3, :ok) end - - tic() - timedwait(callback, 1.0) - et=toq() - # assuming that 0.5 seconds is a good enough buffer on a typical modern CPU - try - @test (et >= 1.0) && (et <= 1.5) - @test !isready(rr3) - catch - warn("timedwait tests delayed. et=$et, isready(rr3)=$(isready(rr3))") - end - @test isready(rr1) -end - -# Test multiple concurrent put!/take! on a channel -function testcpt() - c = Channel() - size = 0 - inc() = size += 1 - dec() = size -= 1 - @sync for i = 1:10^4 - @async (sleep(rand()); put!(c, i); inc()) - @async (sleep(rand()); take!(c); dec()) - end - @test size == 0 -end -testcpt() - -# Test multiple "for" loops waiting on the same channel which -# is closed after adding a few elements. -c=Channel() -results=[] -@sync begin - for i in 1:20 - @async for i in c - push!(results, i) - end - end - sleep(1.0) - for i in 1:5 - put!(c,i) - end - close(c) -end -@test sum(results) == 15 - @test_throws ArgumentError sleep(-1) @test_throws ArgumentError timedwait(()->false, 0.1, pollint=-0.5) @@ -593,7 +533,7 @@ num_small_requests = 10000 # test parallel sends of large arrays from multiple tasks to the same remote worker ntasks = 10 -rr_list = [Channel() for x in 1:ntasks] +rr_list = [Channel(32) for x in 1:ntasks] a = ones(2*10^5) for rr in rr_list @async let rr=rr