diff --git a/ouroboros-consensus-diffusion/changelog.d/20231129_105548_nick.frisby_ChainSync_future.md b/ouroboros-consensus-diffusion/changelog.d/20231129_105548_nick.frisby_ChainSync_future.md new file mode 100644 index 0000000000..bbd432b4e0 --- /dev/null +++ b/ouroboros-consensus-diffusion/changelog.d/20231129_105548_nick.frisby_ChainSync_future.md @@ -0,0 +1,23 @@ + + + + + +### Breaking + +- Integrate the new `InFutureCheck` in the ChainSync client, which requires new + fields in `NodeKernalArgs`. diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs index cc4eb2dce7..d0131b77e3 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs @@ -213,7 +213,7 @@ mkHandlers -- ^ Peer Sharing result computation callback -> Handlers m addrNTN blk mkHandlers - NodeKernelArgs {keepAliveRng, miniProtocolParameters} + NodeKernelArgs {chainSyncFutureCheck, keepAliveRng, miniProtocolParameters} NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers} computePeers = Handlers { @@ -224,6 +224,7 @@ mkHandlers (chainSyncPipeliningHighMark miniProtocolParameters)) (contramap (TraceLabelPeer peer) (Node.chainSyncClientTracer tracers)) getTopLevelConfig + chainSyncFutureCheck (defaultChainDbView getChainDB) , hChainSyncServer = \peer _version -> chainSyncHeadersServer diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index 8bd7b35580..f1de2b3877 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -74,6 +74,7 @@ import Ouroboros.Consensus.Fragment.InFuture (CheckInFuture, ClockSkew) import qualified Ouroboros.Consensus.Fragment.InFuture as InFuture import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState (..)) +import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import qualified Ouroboros.Consensus.Network.NodeToClient as NTC import qualified Ouroboros.Consensus.Network.NodeToNode as NTN import Ouroboros.Consensus.Node.DbLock @@ -392,6 +393,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = cfg rnTraceConsensus btime + (InFutureCheck.realHeaderInFutureCheck llrnMaxClockSkew systemTime) chainDB nodeKernel <- initNodeKernel nodeKernelArgs rnNodeKernelHook registry nodeKernel @@ -639,6 +641,7 @@ mkNodeKernelArgs -> TopLevelConfig blk -> Tracers m (ConnectionId addrNTN) (ConnectionId addrNTC) blk -> BlockchainTime m + -> InFutureCheck.HeaderInFutureCheck m blk -> ChainDB m blk -> m (NodeKernelArgs m addrNTN (ConnectionId addrNTC) blk) mkNodeKernelArgs @@ -648,6 +651,7 @@ mkNodeKernelArgs cfg tracers btime + chainSyncFutureCheck chainDB = do return NodeKernelArgs @@ -657,6 +661,7 @@ mkNodeKernelArgs , btime , chainDB , initChainDB = nodeInitChainDB + , chainSyncFutureCheck , blockFetchSize = estimateBlockSize , mempoolCapacityOverride = NoMempoolCapacityBytesOverride , miniProtocolParameters = defaultMiniProtocolParameters diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index ff73c92667..7e226acc59 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -55,6 +55,8 @@ import Ouroboros.Consensus.Ledger.SupportsPeerSelection import Ouroboros.Consensus.Ledger.SupportsProtocol import Ouroboros.Consensus.Mempool import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface +import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck + (HeaderInFutureCheck) import Ouroboros.Consensus.Node.Run import Ouroboros.Consensus.Node.Tracers import Ouroboros.Consensus.Protocol.Abstract @@ -132,6 +134,7 @@ data NodeKernelArgs m addrNTN addrNTC blk = NodeKernelArgs { , btime :: BlockchainTime m , chainDB :: ChainDB m blk , initChainDB :: StorageConfig blk -> InitChainDB m blk -> m () + , chainSyncFutureCheck :: HeaderInFutureCheck m blk , blockFetchSize :: Header blk -> SizeInBytes , mempoolCapacityOverride :: MempoolCapacityBytesOverride , miniProtocolParameters :: MiniProtocolParameters diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index e8543daa1d..8dd019d403 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -70,6 +70,7 @@ import Ouroboros.Consensus.Ledger.SupportsMempool import Ouroboros.Consensus.Ledger.SupportsProtocol import Ouroboros.Consensus.Mempool import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient +import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import qualified Ouroboros.Consensus.Network.NodeToNode as NTN import Ouroboros.Consensus.Node.ExitPolicy import Ouroboros.Consensus.Node.InitStorage @@ -974,6 +975,10 @@ runThreadNetwork systemTime ThreadNetworkArgs , btime , chainDB , initChainDB = nodeInitChainDB + , chainSyncFutureCheck = + InFutureCheck.realHeaderInFutureCheck + InFuture.defaultClockSkew + (OracularClock.finiteSystemTime clock) , blockFetchSize = estimateBlockSize , mempoolCapacityOverride = NoMempoolCapacityBytesOverride , keepAliveRng = kaRng diff --git a/ouroboros-consensus/changelog.d/20231129_105543_nick.frisby_ChainSync_future.md b/ouroboros-consensus/changelog.d/20231129_105543_nick.frisby_ChainSync_future.md new file mode 100644 index 0000000000..857ac638a7 --- /dev/null +++ b/ouroboros-consensus/changelog.d/20231129_105543_nick.frisby_ChainSync_future.md @@ -0,0 +1,24 @@ + + + + +### Breaking + +- Added a new `InFutureCheck` to the ChainSync client, which requires + additional arguments to the 'chainSyncClient' definition. The node no longer + propagates headers/blocks from the future: a ChainSync client thread now + sleeps until the received header is no longer from the future. diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 2ff8546700..34f809d940 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -141,6 +141,7 @@ library Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface Ouroboros.Consensus.MiniProtocol.BlockFetch.Server Ouroboros.Consensus.MiniProtocol.ChainSync.Client + Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck Ouroboros.Consensus.MiniProtocol.ChainSync.Server Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server @@ -483,6 +484,7 @@ test-suite consensus-test build-depends: , async , base + , base-deriving-via , cardano-binary , cardano-crypto-class , cardano-slotting @@ -504,6 +506,7 @@ test-suite consensus-test , ouroboros-network-protocols:{ouroboros-network-protocols, testlib} , QuickCheck , quickcheck-state-machine + , quiet , random , serialise , si-timers diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Fragment/InFuture.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Fragment/InFuture.hs index 90065b0a6b..0d3dffcaa3 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Fragment/InFuture.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Fragment/InFuture.hs @@ -16,8 +16,9 @@ module Ouroboros.Consensus.Fragment.InFuture ( -- * Clock skew , clockSkewInSeconds , defaultClockSkew - -- ** opaque + -- ** not exporting the constructor , ClockSkew + , unClockSkew -- * Testing , dontCheck , miracle diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index 3488b2e189..43c088d6e2 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -36,7 +36,8 @@ module Ouroboros.Consensus.MiniProtocol.ChainSync.Client ( , TraceChainSyncClientEvent (..) ) where -import Control.Monad.Except +import Control.Monad (join) +import Control.Monad.Except (runExcept, throwError) import Control.Tracer import Data.Kind (Type) import Data.Map.Strict (Map) @@ -51,12 +52,16 @@ import NoThunks.Class (unsafeNoThunks) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config import Ouroboros.Consensus.Forecast +import Ouroboros.Consensus.HardFork.History + (PastHorizonException (PastHorizon)) import Ouroboros.Consensus.HeaderStateHistory (HeaderStateHistory (..), validateHeader) import qualified Ouroboros.Consensus.HeaderStateHistory as HeaderStateHistory import Ouroboros.Consensus.HeaderValidation hiding (validateHeader) +import Ouroboros.Consensus.Ledger.Basics (LedgerState) import Ouroboros.Consensus.Ledger.Extended import Ouroboros.Consensus.Ledger.SupportsProtocol +import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Protocol.Abstract import Ouroboros.Consensus.Storage.ChainDB (ChainDB, @@ -64,6 +69,7 @@ import Ouroboros.Consensus.Storage.ChainDB (ChainDB, import qualified Ouroboros.Consensus.Storage.ChainDB as ChainDB import Ouroboros.Consensus.Util import Ouroboros.Consensus.Util.Assert (assertWithMsg) +import qualified Ouroboros.Consensus.Util.EarlyExit as EarlyExit import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.STM (Fingerprint, Watcher (..), WithFingerprint (..), withWatcher) @@ -426,6 +432,7 @@ chainSyncClient => MkPipelineDecision -> Tracer m (TraceChainSyncClientEvent blk) -> TopLevelConfig blk + -> InFutureCheck.HeaderInFutureCheck m blk -> ChainDbView m blk -> NodeToNodeVersion -> ControlMessageSTM m @@ -433,6 +440,13 @@ chainSyncClient -> StrictTVar m (AnchoredFragment (Header blk)) -> Consensus ChainSyncClientPipelined blk m chainSyncClient mkPipelineDecision0 tracer cfg + InFutureCheck.HeaderInFutureCheck + { -- these fields in order of use + proxyArrival = Proxy :: Proxy arrival + , recordHeaderArrival + , judgeHeaderArrival + , handleHeaderArrival + } ChainDbView { getCurrentChain , getHeaderStateHistory @@ -706,104 +720,156 @@ chainSyncClient mkPipelineDecision0 tracer cfg (ClientPipelinedStIdle n) rollForward mkPipelineDecision n hdr theirTip = Stateful $ \kis -> traceException $ do - now <- getMonotonicTime - let hdrPoint = headerPoint hdr - - isInvalidBlock <- atomically $ forgetFingerprint <$> getIsInvalidBlock - let disconnectWhenInvalid = \case - GenesisHash -> pure () - BlockHash hash -> + arrival <- recordHeaderArrival hdr + now <- getMonotonicTime + let hdrPoint = headerPoint hdr + slotNo = blockSlot hdr + + do + let scrutinee = + case isPipeliningEnabled version of + NotReceivingTentativeBlocks -> BlockHash (headerHash hdr) + -- Disconnect if the parent block of `hdr` is known to be invalid. + ReceivingTentativeBlocks -> headerPrevHash hdr + case scrutinee of + GenesisHash -> return () + BlockHash hash -> do + -- If the peer is sending headers quickly, the + -- @invalidBlockWatcher@ might miss one. So this call is a + -- lightweight supplement. Note that neither check /must/ be 100% + -- reliable. + isInvalidBlock <- atomically $ forgetFingerprint <$> getIsInvalidBlock whenJust (isInvalidBlock hash) $ \reason -> disconnect $ InvalidBlock hdrPoint hash reason - disconnectWhenInvalid $ - case isPipeliningEnabled version of - -- Disconnect if the parent block of `hdr` is known to be invalid. - ReceivingTentativeBlocks -> headerPrevHash hdr - NotReceivingTentativeBlocks -> BlockHash (headerHash hdr) - - -- Get the ledger view required to validate the header - -- NOTE: This will block if we are too far behind. - intersectCheck <- atomically $ do - -- Before obtaining a 'LedgerView', we must find the most recent - -- intersection with the current chain. Note that this is cheap when - -- the chain and candidate haven't changed. - mKis' <- intersectsWithCurrentChain kis - case mKis' of - Nothing -> return NoLongerIntersects - Just kis'@KnownIntersectionState { mostRecentIntersection } -> do - -- We're calling 'ledgerViewForecastAt' in the same STM transaction - -- as 'intersectsWithCurrentChain'. This guarantees the former's - -- precondition: the intersection is within the last @k@ blocks of - -- the current chain. - forecast <- + + mLedgerView <- EarlyExit.withEarlyExit $ do + Intersects kis2 lst <- checkArrivalTime kis arrival + Intersects kis3 ledgerView <- case projectLedgerView slotNo lst of + Just ledgerView -> pure $ Intersects kis2 ledgerView + Nothing -> readLedgerState kis2 (projectLedgerView slotNo) + pure $ Intersects kis3 ledgerView + + case mLedgerView of + + Nothing -> do + -- The above computation exited early, which means our chain (tip) + -- has changed and it no longer intersects with the candidate + -- fragment, so we have to find a new intersection. But first drain + -- the pipe. + continueWithState () + $ drainThePipe n + $ findIntersection NoMoreIntersection + + Just (Intersects kis' ledgerView) -> do + -- Our chain still intersects with the candidate fragment and we + -- have obtained a 'LedgerView' that we can use to validate @hdr@. + let KnownIntersectionState { + ourFrag + , theirFrag + , theirHeaderStateHistory + , mostRecentIntersection + } = kis' + + -- Validate header + theirHeaderStateHistory' <- + case runExcept $ validateHeader cfg ledgerView hdr theirHeaderStateHistory of + Right theirHeaderStateHistory' -> return theirHeaderStateHistory' + Left vErr -> + disconnect $ + HeaderError hdrPoint vErr (ourTipFromChain ourFrag) theirTip + + let theirFrag' = theirFrag :> hdr + -- Advance the most recent intersection if we have the same + -- header on our fragment too. This is cheaper than recomputing + -- the intersection from scratch. + mostRecentIntersection' + | Just ourSuccessor <- + AF.successorBlock (castPoint mostRecentIntersection) ourFrag + , headerHash ourSuccessor == headerHash hdr + = headerPoint hdr + | otherwise + = mostRecentIntersection + kis'' = assertKnownIntersectionInvariants (configConsensus cfg) $ + KnownIntersectionState { + theirFrag = theirFrag' + , theirHeaderStateHistory = theirHeaderStateHistory' + , ourFrag = ourFrag + , mostRecentIntersection = mostRecentIntersection' + } + atomically $ writeTVar varCandidate theirFrag' + atomically $ traceWith headerMetricsTracer (slotNo, now) + + continueWithState kis'' $ nextStep mkPipelineDecision n theirTip + + -- Used in 'rollForward': determines whether the header is from the future, + -- and handle that fact if so. Also return the ledger state used for the + -- determination. + -- + -- Relies on 'readLedgerState'. + checkArrivalTime :: KnownIntersectionState blk + -> arrival + -> EarlyExit.WithEarlyExit m (Intersects blk (LedgerState blk)) + checkArrivalTime kis arrival = do + Intersects kis' (lst, judgment) <- readLedgerState kis $ \lst -> + case runExcept $ judgeHeaderArrival (configLedger cfg) lst arrival of + Left PastHorizon{} -> Nothing + Right judgment -> Just (lst, judgment) + + -- For example, throw an exception if the header is from the far + -- future. + EarlyExit.lift $ handleHeaderArrival judgment >>= \case + Just exn -> disconnect (InFutureHeaderExceedsClockSkew exn) + Nothing -> return $ Intersects kis' lst + + -- Used in 'rollForward': block until the the ledger state at the + -- intersection with the local selection returns 'Just'. + -- + -- Exits early if the intersection no longer exists. + readLedgerState :: KnownIntersectionState blk + -> (LedgerState blk -> Maybe a) + -> EarlyExit.WithEarlyExit m (Intersects blk a) + readLedgerState kis prj = + join $ EarlyExit.lift $ readLedgerStateHelper kis prj + + readLedgerStateHelper :: KnownIntersectionState blk + -> (LedgerState blk -> Maybe a) + -> m (EarlyExit.WithEarlyExit m (Intersects blk a)) + readLedgerStateHelper kis prj = atomically $ do + -- We must first find the most recent intersection with the current + -- chain. Note that this is cheap when the chain and candidate haven't + -- changed. + intersectsWithCurrentChain kis >>= \case + Nothing -> return EarlyExit.exitEarly + Just kis' -> do + let KnownIntersectionState { mostRecentIntersection } = kis' + lst <- maybe (error $ "intersection not within last k blocks: " <> show mostRecentIntersection) - (ledgerViewForecastAt (configLedger cfg) . ledgerState) + ledgerState <$> getPastLedger mostRecentIntersection - - case runExcept $ forecastFor forecast (blockSlot hdr) of - -- The header is too far ahead of the intersection point with our - -- current chain. We have to wait until our chain and the - -- intersection have advanced far enough. This will wait on - -- changes to the current chain via the call to - -- 'intersectsWithCurrentChain' before it. - Left OutsideForecastRange{} -> - retry - Right ledgerView -> - return $ Intersects kis' ledgerView - - case intersectCheck of - NoLongerIntersects -> - -- Our chain (tip) has changed and it no longer intersects with the - -- candidate fragment, so we have to find a new intersection, but - -- first drain the pipe. - continueWithState () - $ drainThePipe n - $ findIntersection NoMoreIntersection - - Intersects kis' ledgerView -> do - -- Our chain still intersects with the candidate fragment and we - -- have obtained a 'LedgerView' that we can use to validate @hdr@. - - let KnownIntersectionState { - ourFrag - , theirFrag - , theirHeaderStateHistory - , mostRecentIntersection - } = kis' - - -- Validate header - theirHeaderStateHistory' <- - case runExcept $ validateHeader cfg ledgerView hdr theirHeaderStateHistory of - Right theirHeaderStateHistory' -> return theirHeaderStateHistory' - Left vErr -> - disconnect $ - HeaderError hdrPoint vErr (ourTipFromChain ourFrag) theirTip - - let theirFrag' = theirFrag :> hdr - -- Advance the most recent intersection if we have the same header - -- on our fragment too. This is cheaper than recomputing the - -- intersection from scratch. - mostRecentIntersection' - | Just ourSuccessor <- - AF.successorBlock (castPoint mostRecentIntersection) ourFrag - , headerHash ourSuccessor == headerHash hdr - = headerPoint hdr - | otherwise - = mostRecentIntersection - kis'' = assertKnownIntersectionInvariants (configConsensus cfg) $ - KnownIntersectionState { - theirFrag = theirFrag' - , theirHeaderStateHistory = theirHeaderStateHistory' - , ourFrag = ourFrag - , mostRecentIntersection = mostRecentIntersection' - } - atomically $ writeTVar varCandidate theirFrag' - let slotNo = blockSlot hdr - atomically $ traceWith headerMetricsTracer (slotNo, now) - - continueWithState kis'' $ nextStep mkPipelineDecision n theirTip + case prj lst of + Nothing -> retry + Just ledgerView -> return $ return $ Intersects kis' ledgerView + + -- Used in 'rollForward': returns 'Nothing' if the ledger state cannot + -- forecast the ledger view that far into the future. + projectLedgerView :: SlotNo + -> LedgerState blk + -> Maybe (LedgerView (BlockProtocol blk)) + projectLedgerView slot lst = + let forecast = ledgerViewForecastAt (configLedger cfg) lst + -- TODO cache this in the KnownIntersectionState? Or even in the + -- LedgerDB? + in + case runExcept $ forecastFor forecast slot of + -- The header is too far ahead of the intersection point with our + -- current chain. We have to wait until our chain and the + -- intersection have advanced far enough. This will wait on + -- changes to the current chain via the call to + -- 'intersectsWithCurrentChain' before it. + Left OutsideForecastRange{} -> Nothing + Right ledgerView -> Just ledgerView rollBackward :: MkPipelineDecision -> Nat n @@ -1024,16 +1090,10 @@ invalidBlockRejector tracer version getIsInvalidBlock getCandidate = throwIO ex -- | Auxiliary data type used as an intermediary result in 'rollForward'. -data IntersectCheck blk = - -- | The upstream chain no longer intersects with our current chain because - -- our current chain changed in the background. - NoLongerIntersects - -- | The upstream chain still intersects with our chain, return the - -- resulting 'KnownIntersectionState' and the 'LedgerView' corresponding to - -- the header 'rollForward' received. - | Intersects - (KnownIntersectionState blk) - (LedgerView (BlockProtocol blk)) +data Intersects blk a = + Intersects + (KnownIntersectionState blk) + a {------------------------------------------------------------------------------- Explicit state @@ -1159,6 +1219,8 @@ data ChainSyncClientException = -- different from the previous argument. (InvalidBlockReason blk) + | InFutureHeaderExceedsClockSkew !InFutureCheck.HeaderArrivalException + deriving instance Show ChainSyncClientException instance Eq ChainSyncClientException where @@ -1180,6 +1242,10 @@ instance Eq ChainSyncClientException where Just Refl -> (a, b, c) == (a', b', c') InvalidBlock{} == _ = False + InFutureHeaderExceedsClockSkew a == InFutureHeaderExceedsClockSkew a' = + a == a' + InFutureHeaderExceedsClockSkew{} == _ = False + instance Exception ChainSyncClientException {------------------------------------------------------------------------------- diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/InFutureCheck.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/InFutureCheck.hs new file mode 100644 index 0000000000..88b004a276 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/InFutureCheck.hs @@ -0,0 +1,145 @@ +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TypeApplications #-} + +module Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck ( + -- * Interface + HeaderInFutureCheck (..) + -- * Real Implementation + , HeaderArrivalException (..) + , realHeaderInFutureCheck + ) where + +import Control.Exception (Exception) +import Control.Monad (guard, unless) +import Control.Monad.Class.MonadTimer.SI (MonadDelay, threadDelay) +import Control.Monad.Except (Except, liftEither) +import Data.Proxy (Proxy (Proxy)) +import Data.Time.Clock (NominalDiffTime) +import Data.Type.Equality ((:~:) (Refl)) +import Data.Typeable (eqT) +import Ouroboros.Consensus.Block.Abstract (Header) +import Ouroboros.Consensus.Block.RealPoint (RealPoint, + headerRealPoint, realPointSlot) +import Ouroboros.Consensus.BlockchainTime.WallClock.Types + (RelativeTime, SystemTime, diffRelTime, systemTimeCurrent) +import Ouroboros.Consensus.Fragment.InFuture (ClockSkew, unClockSkew) +import Ouroboros.Consensus.HardFork.Abstract (HasHardForkHistory, + hardForkSummary) +import Ouroboros.Consensus.HardFork.History (PastHorizonException) +import Ouroboros.Consensus.HardFork.History.Qry (runQuery, + slotToWallclock) +import Ouroboros.Consensus.Ledger.Basics (LedgerConfig, LedgerState) +import Ouroboros.Consensus.Util.Time (nominalDelay) +import Ouroboros.Network.Block (HasHeader) + +{------------------------------------------------------------------------------- + Interface +-------------------------------------------------------------------------------} + +-- | The interface a ChainSync client needs in order to check the arrival time +-- of headers. +-- +-- Instead of alphabetical, the fields are in the order in which the ChainSync +-- client logic will invoke them for each header. +data HeaderInFutureCheck m blk = forall arrival judgment. HeaderInFutureCheck { + proxyArrival :: Proxy arrival + , + -- | This is ideally called _immediately_ upon the header arriving. + recordHeaderArrival :: Header blk -> m arrival + , + -- | Judge what to do about the header's arrival time. + -- + -- Note that this may be called after a delay, hence @arrival@ contains at + -- least the arrival time. + -- + -- In particular, such a delay might be caused by waiting for the + -- intersection with the local selection to change after this function + -- returns 'Ouroboros.Consensus.HardFork.HistoryPastHorizon'. + judgeHeaderArrival :: + LedgerConfig blk + -> LedgerState blk + -> arrival + -> Except PastHorizonException judgment + , + -- | Enact the judgment. + -- + -- If @Just@ is returned, an exception should be raised. + handleHeaderArrival :: judgment -> m (Maybe HeaderArrivalException) + } + +{------------------------------------------------------------------------------- + Real implmementation +-------------------------------------------------------------------------------} + +data HeaderArrivalException = + -- | The header arrived so early that its issuer either minted it before + -- their clock reached its slot onset or else the difference between their + -- clock and ours is more severe than we're configured to tolerate. + -- + -- INVARIANT: @'tolerableClockSkew' < negate 'ageUponArrival'@ + forall blk. HasHeader blk => FarFutureHeaderException { + ageUponArrival :: !NominalDiffTime + , + arrivedPoint :: !(RealPoint blk) + , + arrivalTime :: !RelativeTime + , + tolerableClockSkew :: !NominalDiffTime + } + +deriving instance Show HeaderArrivalException + +instance Exception HeaderArrivalException + +instance Eq HeaderArrivalException where + (==) + (FarFutureHeaderException l0 (l1 :: RealPoint l) l2 l3) + (FarFutureHeaderException r0 (r1 :: RealPoint r) r2 r3) + = case eqT @l @r of + Nothing -> False + Just Refl -> (l0, l1, l2, l3) == (r0, r1, r2, r3) + +realHeaderInFutureCheck :: + ( HasHeader blk + , HasHeader (Header blk) + , HasHardForkHistory blk + , MonadDelay m + ) + => ClockSkew -> SystemTime m -> HeaderInFutureCheck m blk +realHeaderInFutureCheck skew systemTime = HeaderInFutureCheck { + proxyArrival = Proxy + , recordHeaderArrival = \hdr -> do + (,) (headerRealPoint hdr) <$> systemTimeCurrent systemTime + , judgeHeaderArrival = \lcfg lst (p, arrivalTime_) -> do + let qry = slotToWallclock (realPointSlot p) + hfSummary = hardForkSummary lcfg lst + -- TODO cache this in the KnownIntersectionState? Or even in the + -- LedgerDB? + (onset, _slotLength) <- liftEither $ runQuery qry hfSummary + pure (p, arrivalTime_, onset) + , handleHeaderArrival = \(p, arrivalTime_, onset) -> do + let ageUponArrival_ = arrivalTime_ `diffRelTime` onset + tooEarly = unClockSkew skew < negate ageUponArrival_ + -- TODO leap seconds? + + -- this delay is the simple part of Ouroboros Chronos + unless tooEarly $ do + now <- systemTimeCurrent systemTime + let ageNow = now `diffRelTime` onset + syntheticDelay = negate ageNow + threadDelay $ nominalDelay syntheticDelay -- TODO leap seconds? + -- recall that threadDelay ignores negative arguments + + pure $ do + guard tooEarly -- no exception if within skew + pure FarFutureHeaderException { + ageUponArrival = ageUponArrival_ + , arrivedPoint = p + , arrivalTime = arrivalTime_ + , tolerableClockSkew = unClockSkew skew + } + } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Time.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Time.hs index 6fce425df9..b43a7f99e7 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Time.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Time.hs @@ -1,11 +1,22 @@ module Ouroboros.Consensus.Util.Time ( - -- Conversions - nominalDelay + multipleNominalDelay + , nominalDelay , secondsToNominalDiffTime ) where import Data.Time (DiffTime, NominalDiffTime) +{------------------------------------------------------------------------------- + Operations +-------------------------------------------------------------------------------} + +-- | Multiply a 'NominalDiffTime' by an integer +-- +-- The right conversions to use are somewhat tricky. The key fact is that +-- 'fromIntegral' interprets its argument as seconds. +multipleNominalDelay :: Integral a => NominalDiffTime -> a -> NominalDiffTime +multipleNominalDelay dur i = dur * fromIntegral i + {------------------------------------------------------------------------------- Conversions -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs index 59f50b3874..3a45492bc3 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs @@ -1,11 +1,16 @@ {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE DerivingVia #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} -{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} + -- | Tests for the chain sync client. -- -- The chain sync client is a stateful component that tracks the chain of an @@ -22,24 +27,53 @@ -- them to be thrown based on the mock state changes (exceptions such as -- "fork is deep", "up-stream node asked for an invalid rollback", etc.). -- +-- The client's (simulated) wall-clock matters in this test because the +-- ChainSync client has special handling for headers that arrive before the +-- wall-clock reaches the onset of the header's claimed slot, which is +-- inevitable even with only honest peers due to non-global clocks +-- drifting/etc. This test advances time in a way that is unrealistic but does +-- allow for some headers to arrive early (but not so early that the client +-- disconnects from the server). +-- +-- The approach to the clocks is as follows. A logical clock drives the whole +-- test; it ticks along the naturals. Each tick causes the local and upstream +-- chains to update and that's the primary content of the whole test. However, +-- the /first/ thing that happens at the start of each logical tick is the +-- client's simulated wall-clock advances (via a single 'threadDelay' call) to +-- the onset of the greatest slot involved in any of that logical tick's +-- server-side chain updates /less/ the randomly-chosen local clock skew. Thus, +-- if the greatest header involved in some logical tick is part of an upstream +-- chain update, then it will arrive as a future header (but only near-future, +-- never far-future). (Client-side updates are also handled, but slightly +-- differently; see the code comments.) Finally, recall that the @io-sim@ layer +-- means those delays happen nearly instantaneously with respect to the real +-- world wall-clock. module Test.Consensus.MiniProtocol.ChainSync.Client (tests) where import Cardano.Crypto.DSIGN.Mock -import Control.Monad (unless, void, when) +import Cardano.Slotting.Slot (WithOrigin (..)) +import Control.Monad (forM_, unless, void, when) import Control.Monad.Class.MonadThrow (Handler (..), catches) +import Control.Monad.Class.MonadTime (MonadTime, getCurrentTime) import Control.Monad.IOSim (runSimOrThrow) -import Control.Tracer (contramap, nullTracer) -import Data.Bifunctor (first) +import Control.Tracer (contramap, contramapM, nullTracer) +import Data.DerivingVia (InstantiatedAt (InstantiatedAt)) import Data.List (intercalate) +import qualified Data.Map.Merge.Strict as Map import qualified Data.Map.Strict as Map import Data.Maybe (isJust) +import Data.Semigroup (Max (Max), getMax) import qualified Data.Set as Set +import Data.Time (diffUTCTime) import Data.Typeable +import GHC.Generics (Generic) import Network.TypedProtocol.Channel import Network.TypedProtocol.Driver.Simple import Ouroboros.Consensus.Block import Ouroboros.Consensus.BlockchainTime import Ouroboros.Consensus.Config +import Ouroboros.Consensus.Fragment.InFuture (ClockSkew, + clockSkewInSeconds, unClockSkew) import qualified Ouroboros.Consensus.HardFork.History as HardFork import Ouroboros.Consensus.HeaderStateHistory (HeaderStateHistory (..)) @@ -47,6 +81,9 @@ import qualified Ouroboros.Consensus.HeaderStateHistory as HeaderStateHistory import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.Extended hiding (ledgerState) import Ouroboros.Consensus.MiniProtocol.ChainSync.Client +import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck +import Ouroboros.Consensus.Node.NetworkProtocolVersion + (NodeToNodeVersion) import Ouroboros.Consensus.Node.ProtocolInfo import Ouroboros.Consensus.NodeId import Ouroboros.Consensus.Protocol.BFT @@ -57,7 +94,9 @@ import Ouroboros.Consensus.Util.Condense import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.ResourceRegistry import Ouroboros.Consensus.Util.STM (Fingerprint (..), - WithFingerprint (..), forkLinkedWatcher) + WithFingerprint (..)) +import Ouroboros.Consensus.Util.Time (multipleNominalDelay, + nominalDelay) import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.Block (getTipPoint) @@ -74,13 +113,13 @@ import Ouroboros.Network.Protocol.ChainSync.PipelineDecision (pipelineDecisionLowHighMark) import Ouroboros.Network.Protocol.ChainSync.Server import Ouroboros.Network.Protocol.ChainSync.Type (ChainSync) +import Quiet (Quiet (..)) import Test.QuickCheck import Test.Tasty import Test.Tasty.QuickCheck import Test.Util.ChainUpdates (ChainUpdate (..), UpdateBehavior (..), genChainUpdates, toChainUpdates) -import qualified Test.Util.LogicalClock as LogicalClock -import Test.Util.LogicalClock (NumTicks (..), Tick (..)) +import Test.Util.LogicalClock (Tick (..)) import Test.Util.Orphans.Arbitrary () import Test.Util.Orphans.IOLike () import Test.Util.Schedule (Schedule (..), genSchedule, joinSchedule, @@ -103,7 +142,16 @@ tests = testGroup "ChainSyncClient" -------------------------------------------------------------------------------} prop_chainSync :: ChainSyncClientSetup -> Property -prop_chainSync ChainSyncClientSetup {..} = +prop_chainSync testSetup@ChainSyncClientSetup { + securityParam + , clientUpdates + , serverUpdates + , startTick + , invalidBlocks + , clientSlowBy + } = + tabulate "TickArrivalTimeStats" [show (tickArrivalTimeStats traceEvents)] $ + counterexample (prettyChainSyncClientSetup testSetup) $ counterexample ("Client chain: " <> ppChain finalClientChain <> "\n" <> "Server chain: " <> ppChain finalServerChain <> "\n" <> @@ -113,23 +161,25 @@ prop_chainSync ChainSyncClientSetup {..} = -- it, but not the other way around: we don't check whether a situation -- has occured where an exception should have been thrown, but wasn't. case mbResult of - Just (Right (ForkTooDeep intersection _ _)) -> + Just (ClientFinished (ForkTooDeep intersection _ _)) -> label "ForkTooDeep" $ counterexample ("ForkTooDeep intersection: " <> ppPoint intersection) $ not (withinFragmentBounds intersection clientFragment) - Just (Right (NoMoreIntersection (Our ourTip) (Their theirTip))) -> + Just (ClientFinished (NoMoreIntersection (Our ourTip) (Their theirTip))) -> label "NoMoreIntersection" $ counterexample ("NoMoreIntersection ourHead: " <> ppPoint (getTipPoint ourTip) <> ", theirHead: " <> ppPoint (getTipPoint theirTip)) $ not (clientFragment `forksWithinK` syncedFragment) - Just (Right (RolledBackPastIntersection intersection _ _)) -> + Just (ClientFinished (RolledBackPastIntersection intersection _ _)) -> label "RolledBackPastIntersection" $ counterexample ("RolledBackPastIntersection intersection: " <> ppPoint intersection) $ not (withinFragmentBounds intersection syncedFragment) - Just (Right result) -> + Just (ClientFinished result) -> counterexample ("Terminated with result: " ++ show result) False - Just (Left ex) -> + Just (ClientThrew ex) -> counterexample ("Exception: " ++ displayException ex) False + Just (ClientSelectedFutureTip ft) -> + counterexample ("Client selected future tip: " ++ show ft) False Nothing -> counterexample "Synced fragment not a suffix of the server chain" (syncedFragment `isSuffixOf` finalServerChain) .&&. @@ -140,8 +190,20 @@ prop_chainSync ChainSyncClientSetup {..} = where k = maxRollbacks securityParam - ChainSyncOutcome {..} = runSimOrThrow $ - runChainSync securityParam clientUpdates serverUpdates invalidBlocks startTick + ChainSyncOutcome { + finalClientChain + , finalServerChain + , mbResult + , syncedFragment + , traceEvents + } = runSimOrThrow $ + runChainSync + (slotLengthTenthsToClockSkew clientSlowBy) + securityParam + clientUpdates + serverUpdates + invalidBlocks + startTick clientFragment = AF.anchorNewest k $ Chain.toAnchoredFragment finalClientChain @@ -185,6 +247,10 @@ isSuffixOf fragment chain = serverId :: CoreNodeId serverId = CoreNodeId 1 +-- | The schedule that determines the evolution of the local chain. +-- +-- Note that the 'TestBlock' used in this test is constructed in such a way +-- that the block's slot number equals its block number. newtype ClientUpdates = ClientUpdates { getClientUpdates :: Schedule ChainUpdate } deriving (Show) @@ -201,7 +267,7 @@ newtype InvalidBlocks = InvalidBlocks { getInvalidBlocks :: Schedule TestHash } deriving (Show) -type TraceEvent = (Tick, Either +type TraceEvent = (Tick, RelativeTime, Either (TraceChainSyncClientEvent TestBlock) (TraceSendRecv (ChainSync (Header TestBlock) (Point TestBlock) (Tip TestBlock)))) @@ -209,7 +275,7 @@ data ChainSyncOutcome = ChainSyncOutcome { finalClientChain :: Chain TestBlock , finalServerChain :: Chain TestBlock , syncedFragment :: AnchoredFragment TestBlock - , mbResult :: Maybe (Either ChainSyncClientException ChainSyncClientResult) + , mbResult :: Maybe ChainSyncClientTestResult , traceEvents :: [TraceEvent] } @@ -237,18 +303,42 @@ data ChainSyncOutcome = ChainSyncOutcome { -- Note that updates that are scheduled before the time at which we start -- syncing help generate different chains to start syncing from. runChainSync - :: forall m. IOLike m - => SecurityParam + :: forall m. (IOLike m, MonadTime m) + => ClockSkew + -> SecurityParam -> ClientUpdates -> ServerUpdates -> InvalidBlocks -> Tick -- ^ Start chain syncing at this time -> m ChainSyncOutcome -runChainSync securityParam (ClientUpdates clientUpdates) +runChainSync skew securityParam (ClientUpdates clientUpdates) (ServerUpdates serverUpdates) (InvalidBlocks invalidBlocks) startSyncingAt = withRegistry $ \registry -> do - clock <- LogicalClock.new registry numTicks + clientSystemTime <- do + initialIoSimClockValue <- getCurrentTime + pure SystemTime { + systemTimeWait = pure () + , systemTimeCurrent = do + now <- getCurrentTime + -- Subtracting the initial @io-sim@ wall clock to create this + -- 'RelativeTime' causes the test to behave as if the local + -- node and the peer were invoked when the "true" wall clock + -- (which the server's clock happens to equal) is at exactly + -- the onset of Slot 0. + pure $ RelativeTime $ + (now `diffUTCTime` initialIoSimClockValue) + - + unClockSkew skew + } + let _ = clientSystemTime :: SystemTime m + + varCurrentLogicalTick <- uncheckedNewTVarM (Tick 0) + let clockUpdates :: Schedule NewMaxSlot + clockUpdates = + mkClockUpdates + (ClientUpdates clientUpdates) + (ServerUpdates serverUpdates) -- Set up the client varCandidates <- uncheckedNewTVarM Map.empty @@ -261,7 +351,13 @@ runChainSync securityParam (ClientUpdates clientUpdates) -- at the final state of each candidate. varFinalCandidates <- uncheckedNewTVarM Map.empty - (tracer, getTrace) <- first (LogicalClock.tickTracer clock) <$> recordingTracerTVar + (tracer, getTrace) <- do + (tracer', getTrace) <- recordingTracerTVar + let pairWithNow ev = do + logicalNow <- readTVarIO varCurrentLogicalTick + now <- systemTimeCurrent clientSystemTime + pure (logicalNow, now, ev) + pure (contramapM pairWithNow tracer', getTrace) let chainSyncTracer = contramap Left tracer protocolTracer = contramap Right tracer @@ -293,6 +389,12 @@ runChainSync securityParam (ClientUpdates clientUpdates) pure $ WithFingerprint isInvalidBlock fp } + headerInFutureCheck :: InFutureCheck.HeaderInFutureCheck m TestBlock + headerInFutureCheck = + InFutureCheck.realHeaderInFutureCheck skew clientSystemTime + -- Note that this tests passes in the exact difference between the + -- client's and server's clock as the tolerable clock skew. + client :: StrictTVar m (AnchoredFragment (Header TestBlock)) -> Consensus ChainSyncClientPipelined TestBlock @@ -301,8 +403,9 @@ runChainSync securityParam (ClientUpdates clientUpdates) (pipelineDecisionLowHighMark 10 20) chainSyncTracer nodeCfg + headerInFutureCheck chainDbView - maxBound + (maxBound :: NodeToNodeVersion) (return Continue) nullTracer @@ -312,99 +415,156 @@ runChainSync securityParam (ClientUpdates clientUpdates) (Tip TestBlock) m () server = chainSyncServerExample () varChainProducerState getHeader - -- Schedule updates of the client and server chains - varLastUpdate <- uncheckedNewTVarM 0 - let forkLinkedTickWatcher :: (Tick -> m ()) -> m () - forkLinkedTickWatcher = - void - . forkLinkedWatcher registry "scheduled updates" - . LogicalClock.tickWatcher clock - forkLinkedTickWatcher $ \tick -> do - -- Stop updating the client and server chains when the chain sync client - -- has thrown an exception or has gracefully terminated, so that at the - -- end, we can read the chains in the states they were in when the - -- exception was thrown. - stop <- fmap isJust $ atomically $ readTVar varClientResult - unless stop $ do - -- Newly discovered invalid blocks - whenJust (Map.lookup tick (getSchedule invalidBlocks)) $ - atomically . modifyTVar varKnownInvalid . Set.union . Set.fromList - - -- Client - whenJust (Map.lookup tick (getSchedule clientUpdates)) $ \chainUpdates -> - atomically $ modifyTVar varClientState $ updateClientState chainUpdates - - -- Server - whenJust (Map.lookup tick (getSchedule serverUpdates)) $ \chainUpdates -> - atomically $ do - chainProducerState <- readTVar varChainProducerState - case CPS.applyChainUpdates - (toChainUpdates chainUpdates) - chainProducerState of - Just chainProducerState' -> - writeTVar varChainProducerState chainProducerState' - Nothing -> - error $ "Invalid chainUpdates: " <> show chainUpdates <> - " for " <> show (chainState chainProducerState) - atomically $ writeTVar varLastUpdate tick + let advanceWallClockForTick :: Tick -> m () + advanceWallClockForTick tick = do + doTick clockUpdates tick $ \case + [newMaxSlot] -> do + let target = case newMaxSlot of + NewMaxClientSlot slot -> toOnset slot + NewMaxServerSlot slot -> toSkewedOnset slot + + NewMaxClientAndServerSlot cslot sslot -> + toOnset cslot `max` toSkewedOnset sslot + now <- systemTimeCurrent clientSystemTime + threadDelay $ nominalDelay $ target `diffRelTime` now + + _ -> error "impossible! bad mkClockUpdates" + + -- Do scheduled updates of the client and server chains + let updateChainsDuringTick :: Tick -> m () + updateChainsDuringTick tick = do + -- Stop updating the client and server chains when the chain sync client + -- has thrown an exception or has gracefully terminated, so that at the + -- end, we can read the chains in the states they were in when the + -- exception was thrown. + stop <- fmap isJust $ atomically $ readTVar varClientResult + unless stop $ do + -- Newly discovered invalid blocks + whenJust (Map.lookup tick (getSchedule invalidBlocks)) $ + atomically . modifyTVar varKnownInvalid . Set.union . Set.fromList + + -- TODO interleave the client and server chain update + -- applications in a more interesting way? + + -- Client + doTick clientUpdates tick $ \chainUpdates -> + atomically $ modifyTVar varClientState $ updateClientState chainUpdates + + -- Server + doTick serverUpdates tick $ \chainUpdates -> + atomically $ do + chainProducerState <- readTVar varChainProducerState + case CPS.applyChainUpdates + (toChainUpdates chainUpdates) + chainProducerState of + Just chainProducerState' -> + writeTVar varChainProducerState chainProducerState' + Nothing -> + error $ "Invalid chainUpdates: " <> show chainUpdates <> + " for " <> show (chainState chainProducerState) -- Connect client to server and run the chain sync protocol - LogicalClock.onTick registry clock "startSyncing" startSyncingAt $ do - -- When updates are planned at the same time that we start syncing, we - -- wait until these updates are done before we start syncing. - when (Map.member startSyncingAt (getSchedule clientUpdates) || - Map.member startSyncingAt (getSchedule serverUpdates)) $ - atomically $ do - lastUpdate <- readTVar varLastUpdate - check (lastUpdate == startSyncingAt) - - (clientChannel, serverChannel) <- createConnectedChannels - -- Don't link the thread (which will cause the exception to be rethrown - -- in the main thread), just catch the exception and store it, because - -- we want a "regular ending". - void $ forkThread registry "ChainSyncClient" $ - bracketChainSyncClient - chainSyncTracer - chainDbView - varCandidates - serverId - maxBound $ \varCandidate -> do - atomically $ modifyTVar varFinalCandidates $ - Map.insert serverId varCandidate - result <- - runPipelinedPeer protocolTracer codecChainSyncId clientChannel $ - chainSyncClientPeerPipelined $ client varCandidate - atomically $ writeTVar varClientResult (Just (Right result)) - return () - `catchAlsoLinked` \ex -> do - atomically $ writeTVar varClientResult (Just (Left ex)) - -- Rethrow, but it will be ignored anyway. - throwIO ex - void $ forkLinkedThread registry "ChainSyncServer" $ - runPeer nullTracer codecChainSyncId serverChannel - (chainSyncServerPeer server) - - LogicalClock.waitUntilDone clock - -- Wait a random amount of time after the final tick for the chain sync - -- to finish - threadDelay 2000 + -- + -- Happens /immediately after/ the chain and clock effects schedule for + -- 'startSyncingAt'. + let initiateChainSync = do + (clientChannel, serverChannel) <- createConnectedChannels + -- Don't link the thread (which will cause the exception to be + -- rethrown in the main thread), just catch the exception and store + -- it, because we want a "regular ending". + void $ forkThread registry "ChainSyncClient" $ + bracketChainSyncClient + chainSyncTracer + chainDbView + varCandidates + serverId + maxBound $ \varCandidate -> do + atomically $ modifyTVar varFinalCandidates $ + Map.insert serverId varCandidate + result <- + runPipelinedPeer protocolTracer codecChainSyncId clientChannel $ + chainSyncClientPeerPipelined $ client varCandidate + atomically $ writeTVar varClientResult (Just (ClientFinished result)) + return () + `catchAlsoLinked` \ex -> do + atomically $ writeTVar varClientResult (Just (ClientThrew ex)) + -- Rethrow, but it will be ignored anyway. + throwIO ex + void $ forkLinkedThread registry "ChainSyncServer" $ + runPeer nullTracer codecChainSyncId serverChannel + (chainSyncServerPeer server) + + -- If the candidate's tip's slot's onset is ahead of the local wall-clock + -- (which is skewed by 'clientSlowBy'), then the ChainSync client + -- mishandled a block from the future. + let checkTipTime :: m () + checkTipTime = do + now <- systemTimeCurrent clientSystemTime + candidates <- atomically $ + readTVar varCandidates >>= traverse readTVar + forM_ candidates $ \candidate -> do + let p = castPoint $ AF.headPoint candidate :: Point TestBlock + case pointSlot p of + Origin -> pure () + At slot -> when (now < toOnset slot) $ do + atomically $ writeTVar varClientResult $ Just + $ ClientSelectedFutureTip $ FutureTip { + ftNow = now + , ftPoint = (toOnset slot, p) + } + + do + let loop tick = do + -- first update the clocks + advanceWallClockForTick tick + atomically $ writeTVar varCurrentLogicalTick tick + + -- then do the messages + updateChainsDuringTick tick + when (tick == startSyncingAt) $ initiateChainSync + + -- check the invariants before advancing the clock again + -- + -- This is not a perfect check, since the server's chain may have + -- violated the invariant ephemerally (ie due to a subsequent + -- rollback during the same logical tick). However, other + -- QuickCheck seeds/counterexamples should trigger such a bug in + -- a non-ephemeral way. + checkTipTime + + when (tick < finalTick) $ loop (tick + 1) + loop (Tick 1) + + -- This delay seems enough to let all threads finish their final work. + -- + -- TODO what is the necessary threshold? + threadDelay 86400 traceEvents <- getTrace -- Collect the return values atomically $ do - finalClientChain <- readTVar varClientState - finalServerChain <- chainState <$> readTVar varChainProducerState + finalClientChain <- readTVar varClientState + finalServerChain <- chainState <$> readTVar varChainProducerState candidateFragment <- readTVar varFinalCandidates >>= readTVar . (Map.! serverId) - mbResult <- readTVar varClientResult + mbResult <- readTVar varClientResult return ChainSyncOutcome { - syncedFragment = AF.mapAnchoredFragment testHeader candidateFragment - , .. + finalClientChain + , finalServerChain + , mbResult + , syncedFragment = AF.mapAnchoredFragment testHeader candidateFragment + , traceEvents } where k = maxRollbacks securityParam - slotLength :: SlotLength - slotLength = slotLengthFromSec 20 + toSkewedOnset :: SlotNo -> RelativeTime + toSkewedOnset slot = + let RelativeTime onset = toOnset slot + in + RelativeTime $ onset - unClockSkew skew + + doTick :: Schedule a -> Tick -> ([a] -> m ()) -> m () + doTick sched tick kont = whenJust (Map.lookup tick (getSchedule sched)) kont nodeCfg :: TopLevelConfig TestBlock nodeCfg = TopLevelConfig { @@ -431,8 +591,8 @@ runChainSync securityParam (ClientUpdates clientUpdates) numCoreNodes :: NumCoreNodes numCoreNodes = NumCoreNodes 2 - numTicks :: NumTicks - numTicks = LogicalClock.sufficientTimeFor + finalTick :: Tick + finalTick = maximum [ lastTick clientUpdates , lastTick serverUpdates , startSyncingAt @@ -444,6 +604,24 @@ runChainSync securityParam (ClientUpdates clientUpdates) , Handler $ \(ExceptionInLinkedThread _ ex) -> throwIO ex `catch` handler ] +-- | See 'ClientSelectedFutureTip' +data FutureTip = FutureTip { + ftNow :: RelativeTime + -- ^ when the header was selected prematurely + , ftPoint :: (RelativeTime, Point TestBlock) + -- ^ point of the header that was selected prematurely, and the + -- 'RelativeTime' of its slot's onset + } + deriving (Show) + +data ChainSyncClientTestResult = + ClientFinished !ChainSyncClientResult + -- ^ This is only a property failure if the result was unjustified. + | ClientSelectedFutureTip !FutureTip + -- ^ This is always a property failure. + | ClientThrew !ChainSyncClientException + -- ^ This is only a property failure if the exception was unjustified. + updateClientState :: [ChainUpdate] -> Chain TestBlock -> Chain TestBlock updateClientState chainUpdates chain = case Chain.applyChainUpdates (toChainUpdates chainUpdates) chain of @@ -502,6 +680,29 @@ computeHeaderStateHistory cfg = ChainSyncClientSetup -------------------------------------------------------------------------------} +slotLength :: SlotLength +slotLength = slotLengthFromSec $ toEnum slotLengthInSeconds + +slotLengthInSeconds :: Int +slotLengthInSeconds = 10 + +-- | The onset of the slot +toOnset :: SlotNo -> RelativeTime +toOnset slot = RelativeTime $ + multipleNominalDelay + (getSlotLength slotLength) + (unSlotNo slot) + +-- | Tenths of a slot length +-- +-- This adds some fractionality to the test without over-complicating it. +newtype SlotLengthTenths = SlotLengthTenths Int + deriving (Show) + +slotLengthTenthsToClockSkew :: SlotLengthTenths -> ClockSkew +slotLengthTenthsToClockSkew (SlotLengthTenths tenths) = + clockSkewInSeconds $ (toEnum slotLengthInSeconds * toEnum tenths) / 10 + -- | Bundle dependent arguments for test generation data ChainSyncClientSetup = ChainSyncClientSetup { securityParam :: SecurityParam @@ -513,7 +714,12 @@ data ChainSyncClientSetup = ChainSyncClientSetup -- ^ Depends on 'clientUpdates' and 'serverUpdates' , invalidBlocks :: InvalidBlocks -- ^ Blocks that are discovered to be invalid. + , clientSlowBy :: SlotLengthTenths + -- ^ The server's clock minus the client's clock. + -- + -- This is also passed to the code-under-test as the tolerable clock skew. } + deriving (Show) instance Arbitrary ChainSyncClientSetup where arbitrary = do @@ -535,8 +741,23 @@ instance Arbitrary ChainSyncClientSetup where , tbValid b == Invalid ] invalidBlocks <- InvalidBlocks <$> (genSchedule =<< shuffle trapBlocks) - return ChainSyncClientSetup {..} - shrink cscs@ChainSyncClientSetup {..} = + + clientSlowBy <- SlotLengthTenths <$> choose (0, 50) + + return ChainSyncClientSetup { + securityParam + , clientUpdates + , serverUpdates + , startTick + , invalidBlocks + , clientSlowBy + } + shrink cscs@ChainSyncClientSetup { + clientUpdates + , serverUpdates + , startTick + , clientSlowBy + } = -- We don't shrink 'securityParam' because the updates depend on it -- We also don't shrink 'invalidBlocks' right now (as it does not impact @@ -570,12 +791,21 @@ instance Arbitrary ChainSyncClientSetup where , lastTick (getServerUpdates serverUpdates) - 1 ] , startTick' <- [1..min startTick maxStartTick] + ] <> + [ cscs { clientSlowBy = SlotLengthTenths y } + | let SlotLengthTenths x = clientSlowBy + , y <- shrink x ] -instance Show ChainSyncClientSetup where - show ChainSyncClientSetup {..} = unlines +prettyChainSyncClientSetup :: ChainSyncClientSetup -> String +prettyChainSyncClientSetup testSetup = + unlines [ "ChainSyncClientSetup:" , "securityParam: " <> show (maxRollbacks securityParam) + , "clientSlowBy: " <> show (unClockSkew skew) + , "--" + , "clockUpdates:" + , condense (mkClockUpdates clientUpdates serverUpdates) <> "--" , "clientUpdates:" , condense (getClientUpdates clientUpdates) <> "--" , "serverUpdates:" @@ -584,6 +814,19 @@ instance Show ChainSyncClientSetup where , "invalidBlocks: " , condense (getInvalidBlocks invalidBlocks) ] + where + -- if you add a field to this pattern to avoid warnings, add it below too + ChainSyncClientSetup _ _ _ _ _ _dummy = testSetup + ChainSyncClientSetup { + securityParam + , clientSlowBy + , clientUpdates + , serverUpdates + , startTick + , invalidBlocks + } = testSetup + + skew = slotLengthTenthsToClockSkew clientSlowBy -- | Remove client updates that happen at a tick after the tick in which the -- last server updates happened. @@ -617,6 +860,64 @@ genUpdateSchedule genUpdateSchedule updateBehavior securityParam = genChainUpdates updateBehavior securityParam 10 >>= genSchedule +data NewMaxSlot = + NewMaxClientSlot SlotNo + -- ^ the client's chain reaches a new greatest slot + | NewMaxServerSlot SlotNo + -- ^ the server's chain reaches a new greatest slot + | NewMaxClientAndServerSlot SlotNo SlotNo + -- ^ both the client and the server's chain reach a new greatest slot, + -- respectively + deriving (Show) + +instance Condense NewMaxSlot where + condense = \case + NewMaxClientSlot slot -> "c" <> condense slot <> "|s_" + NewMaxServerSlot slot -> "c_|s" <> condense slot + + NewMaxClientAndServerSlot cslot sslot -> + "c" <> condense cslot <> "|s" <> condense sslot + +-- | The schedule of when the the client and server chains reach a new greatest +-- slot, respectively. +-- +-- The resulting schedule has exactly one entry per tick in the map (ie no +-- simultaneity). Moreover, it's monotonic within the client and within the +-- server, but not necessarily in their union. +-- +-- We need to track them separately because the client selecting a block in a +-- slot implies the local clock has reached surpassed that onset, whereas the +-- server doing so does not. +mkClockUpdates :: ClientUpdates -> ServerUpdates -> Schedule NewMaxSlot +mkClockUpdates = \(ClientUpdates cupds) (ServerUpdates supds) -> + Schedule + $ Map.map ((:[])) + $ Map.merge + (Map.mapMissing $ \_ -> NewMaxClientSlot) + (Map.mapMissing $ \_ -> NewMaxServerSlot) + (Map.zipWithMatched $ \_ -> NewMaxClientAndServerSlot) + (newMaxes cupds) + (newMaxes supds) + where + newMaxes :: Schedule ChainUpdate -> Map.Map Tick SlotNo + newMaxes = + makeMonotonic + . Map.mapMaybe (fmap getMax . foldMap maxSlot) + . getSchedule + + maxSlot :: ChainUpdate -> Maybe (Max SlotNo) + maxSlot = foldMap (Just . Max . blockSlot) . \case + AddBlock b -> [b] + SwitchFork _ bs -> bs + + makeMonotonic :: (Eq k, Ord v) => Map.Map k v -> Map.Map k v + makeMonotonic mp = Map.fromAscList $ case Map.toAscList mp of + [] -> [] + (k0, x) : xs -> (k0, x) : go x xs + go acc = \case + [] -> [] + (k, x) : xs -> if x > acc then (k, x) : go x xs else go acc xs + {------------------------------------------------------------------------------- Pretty-printing -------------------------------------------------------------------------------} @@ -638,6 +939,65 @@ ppBlocks :: Point TestBlock -> [TestBlock] -> String ppBlocks a bs = ppPoint a <> " ] " <> intercalate " :> " (map ppBlock bs) ppTraceEvent :: TraceEvent -> String -ppTraceEvent (Tick n, ev) = show n <> " | " <> case ev of +ppTraceEvent (Tick n, RelativeTime t, ev) = show (n, t) <> " | " <> case ev of Left cl -> "Client: " <> show cl Right pt -> "Protocol: " <> show pt + +{------------------------------------------------------------------------------- + Classifying examples +-------------------------------------------------------------------------------} + +data TickArrivalTimeStats a = OnlyNotEarly_SomeEarly { + onlyNotEarlyTATS :: !a + -- ^ Logical ticks in which some headers are arriving but none are from the + -- future + , someEarlyTATS :: !a + -- ^ Logical ticks in which some headers are arriving from the future + } + deriving (Functor, Generic) + deriving (Show) via (Quiet (TickArrivalTimeStats a)) + deriving (Monoid, Semigroup) via + (InstantiatedAt Generic (TickArrivalTimeStats a)) + +data ZOM = Zero | One | Many + deriving (Show) + +sizeZOM :: Set.Set a -> ZOM +sizeZOM x = case Set.size x of + 0 -> Zero + 1 -> One + _ -> Many -- NB negatives are impossible + +tickArrivalTimeStats :: [TraceEvent] -> TickArrivalTimeStats ZOM +tickArrivalTimeStats events = + fmap sizeZOM $ + OnlyNotEarly_SomeEarly { + onlyNotEarlyTATS = onlyNotEarly `Set.difference` someEarly + , someEarlyTATS = someEarly + } + where + -- if you add a field to this pattern to avoid warnings, add it below too + OnlyNotEarly_SomeEarly _ _dummy = tickArrivalTimes events + OnlyNotEarly_SomeEarly { + onlyNotEarlyTATS = onlyNotEarly + , someEarlyTATS = someEarly + } = tickArrivalTimes events + +-- | WARNING 'onlyNotEarlyTATS' is instead merely @someNotEarlyTATS@ in this +-- codomain: it might overlap with the 'someEarlyTATs' field +tickArrivalTimes :: [TraceEvent] -> TickArrivalTimeStats (Set.Set Tick) +tickArrivalTimes = foldMap $ \case + (n, now, Left (TraceDownloadedHeader hdr)) -> + let onset = toOnset (blockSlot hdr) + thisTick = Set.singleton n + in + if now < onset + then OnlyNotEarly_SomeEarly { + onlyNotEarlyTATS = Set.empty + , someEarlyTATS = thisTick + } + else OnlyNotEarly_SomeEarly { + onlyNotEarlyTATS = thisTick + , someEarlyTATS = Set.empty + } + _ -> mempty