Skip to content

Commit

Permalink
Make types and comments a bit more precise for the leak thread
Browse files Browse the repository at this point in the history
  • Loading branch information
facundominguez committed Jul 11, 2024
1 parent 01d4594 commit 5680c0b
Showing 1 changed file with 35 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,28 +219,28 @@ runAgainstBucket ::
(Handlers m -> m a) ->
m (State m, a)
runAgainstBucket config action = do
runThreadVar <- atomically newEmptyTMVar -- see note [Leaky bucket design].
leakingPeriodVersionTMVar <- atomically newEmptyTMVar -- see note [Leaky bucket design].
tid <- myThreadId
bucket <- init config
withAsync (leak runThreadVar tid bucket) $ \_ -> do
atomicallyWithMonotonicTime $ maybeStartThread Nothing runThreadVar bucket
withAsync (leak (readTMVar leakingPeriodVersionTMVar) tid bucket) $ \_ -> do
atomicallyWithMonotonicTime $ maybeStartThread Nothing leakingPeriodVersionTMVar bucket
result <-
action $
Handlers
{ fill = \r t -> (snd <$>) $ snapshotFill bucket r t,
setPaused = setPaused bucket,
updateConfig = updateConfig runThreadVar bucket
updateConfig = updateConfig leakingPeriodVersionTMVar bucket
}
state <- atomicallyWithMonotonicTime $ snapshot bucket
pure (state, result)
where
-- Start the thread (that is, write to its 'runThreadVar') if it is useful.
-- Takes a potential old value of the 'runThreadVar' as first argument,
-- Start the thread (that is, write to its 'leakingPeriodVersionTMVar') if it is useful.
-- Takes a potential old value of the 'leakingPeriodVersionTMVar' as first argument,
-- which will be increased to help differentiate between restarts.
maybeStartThread :: Maybe Int -> StrictTMVar m Int -> Bucket m -> Time -> STM m ()
maybeStartThread oldRunThread runThreadVar bucket time = do
maybeStartThread mLeakingPeriodVersion leakingPeriodVersionTMVar bucket time = do
State {config = Config {rate}} <- snapshot bucket time
when (rate > 0) $ void $ tryPutTMVar runThreadVar $ maybe 0 (+ 1) oldRunThread
when (rate > 0) $ void $ tryPutTMVar leakingPeriodVersionTMVar $ maybe 0 (+ 1) mLeakingPeriodVersion

setPaused :: Bucket m -> Bool -> Time -> STM m ()
setPaused bucket paused time = do
Expand All @@ -253,7 +253,7 @@ runAgainstBucket config action = do
((Rational, Config m) -> (Rational, Config m)) ->
Time ->
STM m ()
updateConfig runThreadVar bucket f time = do
updateConfig leakingPeriodVersionTMVar bucket f time = do
State
{ level = oldLevel,
paused,
Expand All @@ -272,9 +272,9 @@ runAgainstBucket config action = do
configGeneration = oldConfigGeneration + 1,
config = newConfig
}
-- Ensure that 'runThreadVar' is empty, then maybe start the thread.
oldRunThread <- tryTakeTMVar runThreadVar
maybeStartThread oldRunThread runThreadVar bucket time
-- Ensure that 'leakingPeriodVersionTMVar' is empty, then maybe start the thread.
mLeakingPeriodVersion <- tryTakeTMVar leakingPeriodVersionTMVar
maybeStartThread mLeakingPeriodVersion leakingPeriodVersionTMVar bucket time

-- | Initialise a bucket given a configuration. The bucket starts full at the
-- time where one calls 'init'.
Expand All @@ -299,7 +299,7 @@ init config@Config {capacity} = do
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~
--
-- The leaky bucket works by running the given action against a thread that
-- makes the bucket leak. Since that would be extremely inefficient to actually
-- makes the bucket leak. Since it would be inefficient to actually
-- remove tokens one by one from the bucket, the 'leak' thread instead looks at
-- the current state of the bucket, computes how much time it would take for the
-- bucket to empty, and then wait that amount of time. Once the wait is over, it
Expand All @@ -317,41 +317,42 @@ init config@Config {capacity} = do
-- for the action to lower the waiting time by changing the bucket configuration
-- to one where the rate is higher.
--
-- We fix both those issues with one mechanism, the @runThreadVar@. It is an
-- MVar containing an integer that tells the thread whether it should be
-- running. An empty MVar means that the thread should not be running, for
-- instance if the rate is null. A full MVar (no matter what the integer is)
-- means that the thread should be running. When recursing, the thread blocks
-- until the MVar is full, and only then proceeds as described above.
-- We fix both those issues with one mechanism, the @leakingPeriodVersionSTM@.
-- It is a computation returning an integer that identifies a version of the
-- configuration that controls the leaking period. If the computation blocks,
-- it means that no configuration has been determined yet.
-- The leak thread first waits until @leakingPeriodVersionSTM@ yields a
-- value, and only then proceeds as described above.
-- Additionally, while waiting for the bucket to empty, the thread monitors
-- changes to the MVar, indicating either that the thread should stop running or
-- that the configuration changed as that it might have to wait less long. The
-- change in configuration is detected by changes in the integer.
-- for changes to the version of the leaking period, indicating either that the
-- thread should pause running if the @leakingPeriodVersionSTM@ starts blocking
-- again or that the configuration changed as that it might have to wait less
-- long.
--
-- Note that we call \“start\”/\“stop\” running the action of filling/emptying the
-- MVar. This is not to mistaken for the thread actually being spawned/killed.

-- | Monadic action that calls 'threadDelay' until the bucket is empty, then
-- runs the 'onEmpty' action and terminates. See note [Leaky bucket design].
-- | Neverending computation that runs 'onEmpty' whenever the bucket becomes
-- empty. See note [Leaky bucket design].
leak ::
( MonadDelay m,
MonadCatch m,
MonadFork m,
MonadAsync m,
MonadTimer m
) =>
-- | A variable indicating whether the thread should run (when it is filled)
-- or not (otherwise). The integer it carries only helps in differentiating
-- between starts and restarts. 'leak' does not modify this variable.
StrictTMVar m Int ->
-- | A computation indicating the version of the configuration affecting the
-- leaking period. Whenever the configuration changes, the returned integer
-- must be incremented. While no configuration is available, the computation
-- should block. Blocking is allowed at any time, and it will cause the
-- leaking to pause.
STM m Int ->
-- | The 'ThreadId' of the action's thread, which is used to throw exceptions
-- at it.
ThreadId m ->
Bucket m ->
m ()
leak runThreadVar actionThreadId bucket = forever $ do
-- Block until we are allowed to run. Do not modify the TMVar.
oldRunThread <- atomically $ readTMVar runThreadVar
leak leakingPeriodVersionSTM actionThreadId bucket = forever $ do
-- Block until we are allowed to run.
leakingPeriodVersion <- atomically leakingPeriodVersionSTM
-- NOTE: It is tempting to group this @atomically@ and
-- @atomicallyWithMonotonicTime@ into one; however, because the former is
-- blocking, the latter could get a _very_ inaccurate time, which we
Expand All @@ -377,7 +378,7 @@ leak runThreadVar actionThreadId bucket = forever $ do
atomically $
(check =<< TVar.readTVar varTimeout)
`orElse`
(void $ blockUntilChanged id (Just oldRunThread) $ tryReadTMVar runThreadVar)
(void $ blockUntilChanged id leakingPeriodVersion leakingPeriodVersionSTM)

-- | Take a snapshot of the bucket, that is compute its state at the current
-- time.
Expand Down

0 comments on commit 5680c0b

Please sign in to comment.