Skip to content

Commit

Permalink
Revert "ASYNC-252 Split core.async go runtime from ioc-macros namespace"
Browse files Browse the repository at this point in the history
This reverts commit 148ebe2.
  • Loading branch information
puredanger committed Sep 26, 2024
1 parent 050a331 commit bf03523
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 50 deletions.
18 changes: 14 additions & 4 deletions src/main/clojure/clojure/core/async.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ to catch and handle."
[clojure.core.async.impl.buffers :as buffers]
[clojure.core.async.impl.timers :as timers]
[clojure.core.async.impl.dispatch :as dispatch]
[clojure.core.async.impl.ioc-macros :as ioc-macros] ;; only for go analyzer
[clojure.core.async.impl.runtime :as ioc]
[clojure.core.async.impl.ioc-macros :as ioc]
[clojure.core.async.impl.mutex :as mutex]
[clojure.core.async.impl.concurrent :as conc]
)
Expand Down Expand Up @@ -418,7 +417,7 @@ to catch and handle."

(defn ioc-alts! [state cont-block ports & {:as opts}]
(ioc/aset-all! state ioc/STATE-IDX cont-block)
(when-let [cb (do-alts
(when-let [cb (clojure.core.async/do-alts
(fn [val]
(ioc/aset-all! state ioc/VALUE-IDX val)
(ioc/run-state-machine-wrapped state))
Expand Down Expand Up @@ -457,7 +456,18 @@ to catch and handle."
Returns a channel which will receive the result of the body when
completed"
[& body]
(#'clojure.core.async.impl.ioc-macros/go-impl &env body))
(let [crossing-env (zipmap (keys &env) (repeatedly gensym))]
`(let [c# (chan 1)
captured-bindings# (Var/getThreadBindingFrame)]
(dispatch/run
(^:once fn* []
(let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
f# ~(ioc/state-machine `(do ~@body) 1 [crossing-env &env] ioc/async-custom-terminators)
state# (-> (f#)
(ioc/aset-all! ioc/USER-START-IDX c#
ioc/BINDINGS-IDX captured-bindings#))]
(ioc/run-state-machine-wrapped state#))))
c#)))

(defonce ^:private ^Executor thread-macro-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-thread-macro-%d" true)))
Expand Down
152 changes: 109 additions & 43 deletions src/main/clojure/clojure/core/async/impl/ioc_macros.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,39 @@
[clojure.tools.analyzer.passes.jvm.warn-on-reflection :refer [warn-on-reflection]]
[clojure.tools.analyzer.jvm :as an-jvm]
[clojure.core.async.impl.protocols :as impl]
[clojure.core.async.impl.dispatch :as dispatch]
[clojure.core.async.impl.runtime :as rt]
[clojure.set :as set])
(:import [java.util.concurrent.atomic AtomicReferenceArray]
[clojure.lang Var]))
(:import [java.util.concurrent.locks Lock]
[java.util.concurrent.atomic AtomicReferenceArray]))

(defn debug [x]
(pprint x)
x)

(def ^{:const true :tag 'long} FN-IDX 0)
(def ^{:const true :tag 'long} STATE-IDX 1)
(def ^{:const true :tag 'long} VALUE-IDX 2)
(def ^{:const true :tag 'long} BINDINGS-IDX 3)
(def ^{:const true :tag 'long} EXCEPTION-FRAMES 4)
(def ^{:const true :tag 'long} USER-START-IDX 5)

(defn aset-object [^AtomicReferenceArray arr ^long idx o]
(.set arr idx o))

(defn aget-object [^AtomicReferenceArray arr ^long idx]
(.get arr idx))

(defmacro aset-all!
[arr & more]
(assert (even? (count more)) "Must give an even number of args to aset-all!")
(let [bindings (partition 2 more)
arr-sym (gensym "statearr-")]
`(let [~arr-sym ~arr]
~@(map
(fn [[idx val]]
`(aset-object ~arr-sym ~idx ~val))
bindings)
~arr-sym)))

;; State monad stuff, used only in SSA construction

(defmacro gen-plan
Expand Down Expand Up @@ -194,7 +217,7 @@
IEmittableInstruction
(emit-instruction [this state-sym]
(if (= value ::value)
`[~(:id this) (rt/aget-object ~state-sym ~rt/VALUE-IDX)]
`[~(:id this) (aget-object ~state-sym ~VALUE-IDX)]
`[~(:id this) ~value])))

(defrecord RawCode [ast locals]
Expand Down Expand Up @@ -294,10 +317,10 @@
(terminate-block [_this state-sym _]
`(do (case ~val-id
~@(concat (mapcat (fn [test blk]
`[~test (rt/aset-all! ~state-sym ~rt/STATE-IDX ~blk)])
`[~test (aset-all! ~state-sym ~STATE-IDX ~blk)])
test-vals jmp-blocks)
(when default-block
`[(do (rt/aset-all! ~state-sym ~rt/STATE-IDX ~default-block)
`[(do (aset-all! ~state-sym ~STATE-IDX ~default-block)
:recur)])))
:recur)))

Expand Down Expand Up @@ -328,7 +351,7 @@
(block-references [_this] [block])
ITerminator
(terminate-block [_this state-sym _]
`(do (rt/aset-all! ~state-sym ~rt/VALUE-IDX ~value ~rt/STATE-IDX ~block)
`(do (aset-all! ~state-sym ~VALUE-IDX ~value ~STATE-IDX ~block)
:recur)))

(defrecord Return [value]
Expand All @@ -341,7 +364,7 @@
(terminate-block [this state-sym custom-terminators]
(if-let [f (get custom-terminators (terminator-code this))]
`(~f ~state-sym ~value)
`(do (rt/aset-all! ~state-sym ~rt/VALUE-IDX ~value ~rt/STATE-IDX ::finished)
`(do (aset-all! ~state-sym ~VALUE-IDX ~value ~STATE-IDX ::finished)
nil))))

(defrecord CondBr [test then-block else-block]
Expand All @@ -352,8 +375,8 @@
ITerminator
(terminate-block [_this state-sym _]
`(do (if ~test
(rt/aset-all! ~state-sym ~rt/STATE-IDX ~then-block)
(rt/aset-all! ~state-sym ~rt/STATE-IDX ~else-block))
(aset-all! ~state-sym ~STATE-IDX ~then-block)
(aset-all! ~state-sym ~STATE-IDX ~else-block))
:recur)))

(defrecord PushTry [catch-block]
Expand All @@ -363,7 +386,7 @@
(block-references [_this] [catch-block])
IEmittableInstruction
(emit-instruction [_this state-sym]
`[~'_ (rt/aset-all! ~state-sym ~rt/EXCEPTION-FRAMES (cons ~catch-block (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES)))]))
`[~'_ (aset-all! ~state-sym ~EXCEPTION-FRAMES (cons ~catch-block (aget-object ~state-sym ~EXCEPTION-FRAMES)))]))

(defrecord PopTry []
IInstruction
Expand All @@ -372,7 +395,7 @@
(block-references [_this] [])
IEmittableInstruction
(emit-instruction [_this state-sym]
`[~'_ (rt/aset-all! ~state-sym ~rt/EXCEPTION-FRAMES (rest (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES)))]))
`[~'_ (aset-all! ~state-sym ~EXCEPTION-FRAMES (rest (aget-object ~state-sym ~EXCEPTION-FRAMES)))]))

(defrecord CatchHandler [catches]
IInstruction
Expand All @@ -382,10 +405,10 @@
ITerminator
(terminate-block [_this state-sym _]
(let [ex (gensym 'ex)]
`(let [~ex (rt/aget-object ~state-sym ~rt/VALUE-IDX)]
`(let [~ex (aget-object ~state-sym ~VALUE-IDX)]
(cond
~@(for [[handler-idx type] catches
i [`(instance? ~type ~ex) `(rt/aset-all! ~state-sym ~rt/STATE-IDX ~handler-idx)]]
i [`(instance? ~type ~ex) `(aset-all! ~state-sym ~STATE-IDX ~handler-idx)]]
i)
:else (throw ~ex))
:recur))))
Expand Down Expand Up @@ -865,7 +888,7 @@
(if (empty? args)
[]
(mapcat (fn [sym]
`[~sym (rt/aget-object ~state-sym ~(id-for-inst local-map sym))])
`[~sym (aget-object ~state-sym ~(id-for-inst local-map sym))])
args))))

(defn- build-block-body [state-sym blk]
Expand All @@ -882,27 +905,27 @@
blk)
results (interleave (map (partial id-for-inst local-map) results) results)]
(if-not (empty? results)
[state-sym `(rt/aset-all! ~state-sym ~@results)]
[state-sym `(aset-all! ~state-sym ~@results)]
[])))

(defn- emit-state-machine [machine num-user-params custom-terminators]
(let [index (index-state-machine machine)
state-sym (with-meta (gensym "state_")
{:tag 'objects})
local-start-idx (+ num-user-params rt/USER-START-IDX)
local-start-idx (+ num-user-params USER-START-IDX)
state-arr-size (+ local-start-idx (count-persistent-values index))
local-map (atom {::next-idx local-start-idx})
block-catches (:block-catches machine)]
`(fn state-machine#
([] (rt/aset-all! (AtomicReferenceArray. ~state-arr-size)
~rt/FN-IDX state-machine#
~rt/STATE-IDX ~(:start-block machine)))
([] (aset-all! (AtomicReferenceArray. ~state-arr-size)
~FN-IDX state-machine#
~STATE-IDX ~(:start-block machine)))
([~state-sym]
(let [old-frame# (clojure.lang.Var/getThreadBindingFrame)
ret-value# (try
(clojure.lang.Var/resetThreadBindingFrame (rt/aget-object ~state-sym ~rt/BINDINGS-IDX))
(clojure.lang.Var/resetThreadBindingFrame (aget-object ~state-sym ~BINDINGS-IDX))
(loop []
(let [result# (case (int (rt/aget-object ~state-sym ~rt/STATE-IDX))
(let [result# (case (int (aget-object ~state-sym ~STATE-IDX))
~@(mapcat
(fn [[id blk]]
[id `(let [~@(concat (build-block-preamble local-map index state-sym blk)
Expand All @@ -914,18 +937,77 @@
(recur)
result#)))
(catch Throwable ex#
(rt/aset-all! ~state-sym ~rt/VALUE-IDX ex#)
(if (seq (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES))
(rt/aset-all! ~state-sym ~rt/STATE-IDX (first (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES)))
(aset-all! ~state-sym ~VALUE-IDX ex#)
(if (seq (aget-object ~state-sym ~EXCEPTION-FRAMES))
(aset-all! ~state-sym ~STATE-IDX (first (aget-object ~state-sym ~EXCEPTION-FRAMES)))
(throw ex#))
:recur)
(finally
(rt/aset-object ~state-sym ~rt/BINDINGS-IDX (clojure.lang.Var/getThreadBindingFrame))
(aset-object ~state-sym ~BINDINGS-IDX (clojure.lang.Var/getThreadBindingFrame))
(clojure.lang.Var/resetThreadBindingFrame old-frame#)))]
(if (identical? ret-value# :recur)
(recur ~state-sym)
ret-value#))))))

(defn finished?
"Returns true if the machine is in a finished state"
[state-array]
(identical? (aget-object state-array STATE-IDX) ::finished))

(defn- fn-handler
[f]
(reify
Lock
(lock [_])
(unlock [_])

impl/Handler
(active? [_] true)
(blockable? [_] true)
(lock-id [_] 0)
(commit [_] f)))


(defn run-state-machine [state]
((aget-object state FN-IDX) state))

(defn run-state-machine-wrapped [state]
(try
(run-state-machine state)
(catch Throwable ex
(impl/close! (aget-object state USER-START-IDX))
(throw ex))))

(defn take! [state blk c]
(if-let [cb (impl/take! c (fn-handler
(fn [x]
(aset-all! state VALUE-IDX x STATE-IDX blk)
(run-state-machine-wrapped state))))]
(do (aset-all! state VALUE-IDX @cb STATE-IDX blk)
:recur)
nil))

(defn put! [state blk c val]
(if-let [cb (impl/put! c val (fn-handler (fn [ret-val]
(aset-all! state VALUE-IDX ret-val STATE-IDX blk)
(run-state-machine-wrapped state))))]
(do (aset-all! state VALUE-IDX @cb STATE-IDX blk)
:recur)
nil))

(defn return-chan [state value]
(let [c (aget-object state USER-START-IDX)]
(when-not (nil? value)
(impl/put! c value (fn-handler (fn [_] nil))))
(impl/close! c)
c))

(def async-custom-terminators
{'clojure.core.async/<! `take!
'clojure.core.async/>! `put!
'clojure.core.async/alts! 'clojure.core.async/ioc-alts!
:Return `return-chan})

(defn mark-transitions
{:pass-info {:walk :post :depends #{} :after an-jvm/default-passes}}
[{:keys [op fn] :as ast}]
Expand Down Expand Up @@ -1028,19 +1110,3 @@
(parse-to-state-machine user-transitions)
second
(emit-state-machine num-user-params user-transitions))))

(defn go-impl
[env body]
(let [crossing-env (zipmap (keys env) (repeatedly gensym))]
`(let [c# (clojure.core.async/chan 1)
captured-bindings# (Var/getThreadBindingFrame)]
(dispatch/run
(^:once fn* []
(let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
f# ~(state-machine
`(do ~@body) 1 [crossing-env env] rt/async-custom-terminators)
state# (-> (f#)
(rt/aset-all! rt/USER-START-IDX c#
rt/BINDINGS-IDX captured-bindings#))]
(rt/run-state-machine-wrapped state#))))
c#)))
6 changes: 3 additions & 3 deletions src/test/clojure/clojure/core/async/ioc_macros_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns clojure.core.async.ioc-macros-test
(:refer-clojure :exclude [map into reduce transduce merge take partition
partition-by])
(:require [clojure.core.async.impl.runtime :as ioc]
(:require [clojure.core.async.impl.ioc-macros :as ioc]
[clojure.core.async :refer :all :as async]
[clojure.set :as set]
[clojure.test :refer :all])
Expand All @@ -24,7 +24,7 @@
crossing-env (zipmap (keys &env) (repeatedly gensym))]
`(let [captured-bindings# (clojure.lang.Var/getThreadBindingFrame)
~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~l)]) crossing-env)
state# (~(clojure.core.async.impl.ioc-macros/state-machine `(do ~@body) 0 [crossing-env &env] terminators))]
state# (~(ioc/state-machine `(do ~@body) 0 [crossing-env &env] terminators))]
(ioc/aset-all! state# ~ioc/BINDINGS-IDX captured-bindings#)
(ioc/run-state-machine state#)
(ioc/aget-object state# ioc/VALUE-IDX))))
Expand Down Expand Up @@ -553,7 +553,7 @@
crossing-env (zipmap (keys &env) (repeatedly gensym))]
`(let [captured-bindings# (clojure.lang.Var/getThreadBindingFrame)
~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
state# (~(clojure.core.async.impl.ioc-macros/state-machine
state# (~(ioc/state-machine
`(do ~@body)
0
[crossing-env &env]
Expand Down

0 comments on commit bf03523

Please sign in to comment.