Skip to content

Commit

Permalink
Fix tentative followers
Browse files Browse the repository at this point in the history
The idea is to change the instruction-emitting parts of the `Follower`
abstraction to be aware of the tentative chain. Concretely, we now use
`getCurrentChainByType` instead of `readTVar cdbChain` in two
occasions:

  1. When we are rolling forward from an in-memory point, which is the
     most relevant case for pipelining (as pipelining is only relevant
     when the follower is at the tip of our chain).

     This change is crucial: If we erroneously would use the selected
     chain in a tentative follower, rolling forward from the tentative
     point would result in us jumping back to the immutable DB as the
     tentative header is never part of the selected chain, which would
     then trigger an error as the tentative header is never in the
     immutable DB.

  2. When rolling forward from a point in the immutable DB, when we
     are about to switch to the in-memory fragment. In most cases, the
     choice will be irrelevant here, except for this rare
     scenario (data loss in the VolDB): `cdbChain` could be empty, and
     the tentative header could be set, in which case we can roll
     forward tentative followers which are at the tip of the immutable
     DB.

We leave follower forwarding (triggered via `MsgFindInteresct`)
untouched as pipelining is only concerned with up-to-date peers.
  • Loading branch information
amesgen committed Apr 12, 2022
1 parent bb78cfc commit 4fa8cda
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ initialChainSelection immutableDB volatileDB lgrDB tracer cfg varInvalid
candidates) $
assert (all (preferAnchoredCandidate bcfg curChain) candidates) $ do
cse <- chainSelEnv
fmap fst <$> chainSelection cse (Diff.extend <$> candidates)
chainSelection cse (Diff.extend <$> candidates)
where
curChain = VF.validatedFragment curChainAndLedger
ledger = VF.validatedLedger curChainAndLedger
Expand Down Expand Up @@ -592,11 +592,10 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = do
chainSelection chainSelEnv chainDiffs' >>= \case
Nothing ->
return curTip
Just (validatedChainDiff, tentativeChainType) ->
Just validatedChainDiff ->
switchTo
validatedChainDiff
(varTentativeHeader chainSelEnv)
tentativeChainType
AddedToCurrentChain
where
chainSelEnv = mkChainSelEnv curChainAndLedger
Expand Down Expand Up @@ -654,11 +653,10 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = do
chainSelection chainSelEnv chainDiffs' >>= \case
Nothing ->
return curTip
Just (validatedChainDiff, tentativeChainType) ->
Just validatedChainDiff ->
switchTo
validatedChainDiff
(varTentativeHeader chainSelEnv)
tentativeChainType
SwitchedToAFork
where
chainSelEnv = mkChainSelEnv curChainAndLedger
Expand Down Expand Up @@ -711,9 +709,6 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = do
-- ^ Chain and ledger to switch to
-> StrictTVar m (Maybe (Header blk))
-- ^ Tentative header
-> ChainType
-- ^ Whether the tentative followers already track the chain we are
-- switching to. Also see the return type of 'chainSelection'.
-> ( [LedgerEvent blk]
-> NewTipInfo blk
-> AnchoredFragment (Header blk)
Expand All @@ -723,7 +718,7 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = do
-- ^ Given the 'NewTipInfo', the previous chain, and the new chain,
-- return the event to trace when we switched to the new chain.
-> m (Point blk)
switchTo vChainDiff varTentativeHeader tentativeChainType mkTraceEvent = do
switchTo vChainDiff varTentativeHeader mkTraceEvent = do
(curChain, newChain, events) <- atomically $ do
curChain <- readTVar cdbChain -- Not Query.getCurrentChain!
curLedger <- LgrDB.getCurrent cdbLgrDB
Expand Down Expand Up @@ -752,14 +747,7 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr punish = do
let ipoint = castPoint $ Diff.getAnchorPoint chainDiff
followerHandles <- Map.elems <$> readTVar cdbFollowers
forM_ followerHandles $ \followerHandle ->
-- When the follower is tentative and this chainDiff was
-- pipelined (so if tentChainType is tentative), then this
-- follower is already up-to-date and must not be modified
unless
( fhChainType followerHandle == TentativeChain
&& tentativeChainType == TentativeChain
)
(fhSwitchFork followerHandle ipoint newChain)
fhSwitchFork followerHandle ipoint newChain

return (curChain, newChain, events)

Expand Down Expand Up @@ -859,14 +847,10 @@ chainSelection
)
=> ChainSelEnv m blk
-> NonEmpty (ChainDiff (Header blk))
-> m (Maybe (ValidatedChainDiff (Header blk) (LedgerDB' blk), ChainType))
-> m (Maybe (ValidatedChainDiff (Header blk) (LedgerDB' blk)))
-- ^ The (valid) chain diff and corresponding LedgerDB that was selected,
-- or 'Nothing' if there is no valid chain diff preferred over the current
-- chain.
--
-- Also, we return the 'ChainType' which indicates whether the tentative
-- followers already track the about-to-be-selected 'TentativeChain', or
-- still the currently 'SelectedChain'.
chainSelection chainSelEnv chainDiffs =
assert (all (preferAnchoredCandidate bcfg curChain . Diff.getSuffix)
chainDiffs) $
Expand All @@ -892,7 +876,7 @@ chainSelection chainSelEnv chainDiffs =
-- [Ouroboros] below.
go ::
[ChainDiff (Header blk)]
-> m (Maybe (ValidatedChainDiff (Header blk) (LedgerDB' blk), ChainType))
-> m (Maybe (ValidatedChainDiff (Header blk) (LedgerDB' blk)))
go [] = return Nothing
go (candidate:candidates0) = do
mTentativeHeader <- setTentativeHeader
Expand All @@ -906,10 +890,8 @@ chainSelection chainSelEnv chainDiffs =
go (sortCandidates candidates1)
FullyValid validatedCandidate@(ValidatedChainDiff candidate' _) ->
-- The entire candidate is valid
assert (Diff.getTip candidate == Diff.getTip candidate') $ do
let tentativeChainType =
if isJust mTentativeHeader then TentativeChain else SelectedChain
return $ Just (validatedCandidate, tentativeChainType)
assert (Diff.getTip candidate == Diff.getTip candidate') $
return $ Just validatedCandidate
ValidPrefix candidate' -> do
whenJust mTentativeHeader clearTentativeHeader
-- Prefix of the candidate because it contained rejected blocks
Expand All @@ -935,29 +917,31 @@ chainSelection chainSelEnv chainDiffs =
-- | Set and return the tentative header, if applicable. Also, notify
-- the tentative followers.
setTentativeHeader :: m (Maybe (Header blk))
setTentativeHeader = atomically $ do
setTentativeHeader = do
mTentativeHeader <-
(\ts -> isPipelineable bcfg ts candidate)
<$> readTVar varTentativeState
<$> readTVarIO varTentativeState
whenJust mTentativeHeader $ \tentativeHeader -> do
writeTVar varTentativeHeader $ Just $! tentativeHeader
let tentativeChain = curChain AF.:> tentativeHeader
forTentativeFollowers $ \followerHandle ->
fhSwitchFork followerHandle curTipPoint tentativeChain
atomically $
writeTVar varTentativeHeader $ Just $! tentativeHeader
-- As we are only extending the existing chain, the intersection
-- point is not receding, in which case fhSwitchFork is not
-- necessary.
pure mTentativeHeader

-- | Clear a tentative header that turned out to be invalid. Also, roll
-- back the tentative followers.
clearTentativeHeader :: Header blk -> m ()
clearTentativeHeader tentativeHeader = atomically $ do
writeTVar varTentativeHeader Nothing
writeTVar varTentativeState $
LastInvalidTentative (selectView bcfg tentativeHeader)
forTentativeFollowers $ \followerHandle ->
fhSwitchFork followerHandle curTipPoint curChain

curTipPoint = castPoint $ AF.headPoint curChain
forTentativeFollowers f = getTentativeFollowers >>= mapM_ f
clearTentativeHeader tentativeHeader = do
atomically $ do
writeTVar varTentativeHeader Nothing
writeTVar varTentativeState $
LastInvalidTentative (selectView bcfg tentativeHeader)
forTentativeFollowers $ \followerHandle -> do
let curTipPoint = castPoint $ AF.headPoint curChain
fhSwitchFork followerHandle curTipPoint curChain
where
forTentativeFollowers f = getTentativeFollowers >>= mapM_ f

-- | Truncate the given (remaining) candidates that contain rejected
-- blocks. Discard them if they are truncated so much that they are no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)
import Ouroboros.Consensus.Util.STM (blockUntilJust)

import Ouroboros.Consensus.Storage.ChainDB.API (BlockComponent (..),
ChainDbError (..), ChainType, Follower (..), getPoint)
ChainDbError (..), ChainType (..), Follower (..), getPoint)
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.Query as Query
import Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
Expand Down Expand Up @@ -98,7 +98,8 @@ newFollower h registry chainType blockComponent = getEnv h $ \CDB{..} -> do
varFollower <- newTVarIO FollowerInit
let followerHandle = mkFollowerHandle varFollower
atomically $ modifyTVar cdbFollowers $ Map.insert followerKey followerHandle
let follower = makeNewFollower h followerKey varFollower registry blockComponent
let follower =
makeNewFollower h followerKey varFollower chainType registry blockComponent
traceWith cdbTracer $ TraceFollowerEvent NewFollower
return follower
where
Expand Down Expand Up @@ -127,19 +128,20 @@ makeNewFollower ::
=> ChainDbHandle m blk
-> FollowerKey
-> StrictTVar m (FollowerState m blk b)
-> ChainType
-> ResourceRegistry m
-> BlockComponent blk b
-> Follower m blk b
makeNewFollower h followerKey varFollower registry blockComponent = Follower {..}
makeNewFollower h followerKey varFollower chainType registry blockComponent = Follower {..}
where
followerInstruction :: m (Maybe (ChainUpdate blk b))
followerInstruction = getFollower h followerKey $
instructionHelper registry varFollower blockComponent id
instructionHelper registry varFollower chainType blockComponent id

followerInstructionBlocking :: m (ChainUpdate blk b)
followerInstructionBlocking = fmap runIdentity $
getFollower h followerKey $
instructionHelper registry varFollower blockComponent (fmap Identity . blockUntilJust)
instructionHelper registry varFollower chainType blockComponent (fmap Identity . blockUntilJust)

followerForward :: [Point blk] -> m (Maybe (Point blk))
followerForward = getFollower1 h followerKey $
Expand Down Expand Up @@ -204,6 +206,7 @@ instructionHelper ::
)
=> ResourceRegistry m
-> StrictTVar m (FollowerState m blk b)
-> ChainType
-> BlockComponent blk b
-> ( STM m (Maybe (ChainUpdate blk (Header blk)))
-> STM m (f (ChainUpdate blk (Header blk))))
Expand All @@ -213,12 +216,12 @@ instructionHelper ::
-- @Maybe@.
-> ChainDbEnv m blk
-> m (f (ChainUpdate blk b))
instructionHelper registry varFollower blockComponent fromMaybeSTM CDB{..} = do
instructionHelper registry varFollower chainType blockComponent fromMaybeSTM CDB{..} = do
-- In one transaction: check in which state we are, if in the
-- @FollowerInMem@ state, just call 'instructionSTM', otherwise,
-- return the contents of the 'FollowerInImmutableDB' state.
inImmutableDBOrRes <- atomically $ do
curChain <- readTVar cdbChain
curChain <- getCurrentChainByType
readTVar varFollower >>= \case
-- Just return the contents of the state and end the transaction in
-- these two cases.
Expand Down Expand Up @@ -267,6 +270,14 @@ instructionHelper registry varFollower blockComponent fromMaybeSTM CDB{..} = do
where
trace = traceWith (contramap TraceFollowerEvent cdbTracer)

getCurrentChainByType = do
curChain <- readTVar cdbChain
case chainType of
SelectedChain -> pure curChain
TentativeChain -> readTVar cdbTentativeHeader <&> \case
Just hdr -> curChain AF.:> hdr
Nothing -> curChain

codecConfig :: CodecConfig blk
codecConfig = configCodec cdbTopLevelConfig

Expand Down Expand Up @@ -353,7 +364,7 @@ instructionHelper registry varFollower blockComponent fromMaybeSTM CDB{..} = do
-> do
trace $ FollowerSwitchToMem pt slotNoAtImmutableDBTip
fupdate <- atomically $ fromMaybeSTM $ do
curChain <- readTVar cdbChain
curChain <- getCurrentChainByType
instructionSTM
(RollForwardFrom pt)
curChain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ data ChainDbEnv m blk = CDB
-- blocks back, as opposed to the anchor point of 'cdbChain'.
, cdbTentativeState :: !(StrictTVar m (TentativeState blk))
, cdbTentativeHeader :: !(StrictTVar m (Maybe (Header blk)))
-- ^ The tentative header. Note that we never read from this; it is only
-- maintained for clarity and explicitness.
-- ^ The tentative header, for diffusion pipelining.
--
-- INVARIANT: It fits on top of the current chain, and its body is not known
-- to be invalid, but might turn out to be.
Expand Down

0 comments on commit 4fa8cda

Please sign in to comment.