Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
connection-manager: moved ConnectionState to its own module
  • Loading branch information
coot committed Dec 10, 2024
commit 041a8db34c77451294e9c899220ceae86c89e0d0
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ library
Ouroboros.Network.ConnectionId
Ouroboros.Network.ConnectionManager.Core
Ouroboros.Network.ConnectionManager.InformationChannel
Ouroboros.Network.ConnectionManager.State
Ouroboros.Network.ConnectionManager.Types
Ouroboros.Network.Context
Ouroboros.Network.Driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
-- Undecidable instances are need for 'Show' instance of 'ConnectionState'.
{-# LANGUAGE QuantifiedConstraints #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE UndecidableInstances #-}

Expand Down Expand Up @@ -41,9 +39,7 @@ import Control.Monad.Class.MonadTimer.SI
import Control.Monad.Fix
import Control.Tracer (Tracer, contramap, traceWith)
import Data.Foldable (foldMap', traverse_)
import Data.Function (on)
import Data.Functor (void, ($>))
import Data.Maybe (maybeToList)
import Data.Proxy (Proxy (..))
import Data.Typeable (Typeable)
import GHC.Stack (CallStack, HasCallStack, callStack)
Expand All @@ -66,11 +62,11 @@ import Ouroboros.Network.ConnectionManager.InformationChannel
(InformationChannel)
import Ouroboros.Network.ConnectionManager.InformationChannel qualified as InfoChannel
import Ouroboros.Network.ConnectionManager.Types
import Ouroboros.Network.ConnectionManager.State
import Ouroboros.Network.InboundGovernor.Event (NewConnectionInfo (..))
import Ouroboros.Network.MuxMode
import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..))
import Ouroboros.Network.Snocket
import Ouroboros.Network.Testing.Utils (WithName (..))


-- | Arguments for a 'ConnectionManager' which are independent of 'MuxMode'.
Expand Down Expand Up @@ -151,151 +147,13 @@ data Arguments handlerTrace socket peerAddr handle handleError versionNumber ver
}


-- | 'MutableConnState', which supplies a unique identifier.
--
-- TODO: We can get away without id, by tracking connections in
-- `TerminatingState` using a separate priority search queue.
--
data MutableConnState peerAddr handle handleError version m = MutableConnState {
-- | A unique identifier
--
connStateId :: !Int

, -- | Mutable state
--
connVar :: !(StrictTVar m (ConnectionState peerAddr handle handleError
version m))
}


instance Eq (MutableConnState peerAddr handle handleError version m) where
(==) = (==) `on` connStateId


-- | A supply of fresh id's.
--
-- We use a fresh ids for 'MutableConnState'.
--
newtype FreshIdSupply m = FreshIdSupply { getFreshId :: STM m Int }


-- | Create a 'FreshIdSupply' inside an 'STM' monad.
--
newFreshIdSupply :: forall m. MonadSTM m
=> Proxy m -> STM m (FreshIdSupply m)
newFreshIdSupply _ = do
(v :: StrictTVar m Int) <- newTVar 0
let getFreshId :: STM m Int
getFreshId = do
c <- readTVar v
writeTVar v (succ c)
return c
return $ FreshIdSupply { getFreshId }


newMutableConnState :: forall peerAddr handle handleError version m.
( MonadTraceSTM m
, Typeable peerAddr
)
=> peerAddr
-> FreshIdSupply m
-> ConnectionState peerAddr handle handleError
version m
-> STM m (MutableConnState peerAddr handle handleError
version m)
newMutableConnState peerAddr freshIdSupply connState = do
connStateId <- getFreshId freshIdSupply
connVar <- newTVar connState
-- This tracing is a no op in IO.
--
-- We need this for IOSimPOR testing of connection manager state
-- transition tests. It can happen that the transitions happen
-- correctly but IOSimPOR reorders the threads that log the transitions.
-- This is a false positive and we don't want that to happen.
--
-- The simplest way to do so is to leverage the `traceTVar` IOSim
-- capabilities. These trace messages won't be reordered by IOSimPOR
-- since these happen atomically in STM.
--
traceTVar
(Proxy @m) connVar
(\mbPrev curr ->
let currAbs = abstractState (Known curr)
in case mbPrev of
Just prev |
let prevAbs = abstractState (Known prev)
, prevAbs /= currAbs -> pure
$ TraceDynamic
$ WithName connStateId
$ TransitionTrace peerAddr
$ mkAbsTransition prevAbs
currAbs
Nothing -> pure
$ TraceDynamic
$ WithName connStateId
$ TransitionTrace peerAddr
$ mkAbsTransition TerminatedSt
currAbs
_ -> pure DontTrace
)
return $ MutableConnState { connStateId, connVar }


-- | 'ConnectionManager' state: for each peer we keep a 'ConnectionState' in
-- a mutable variable, which reduces congestion on the 'TMVar' which keeps
-- 'ConnectionManagerState'.
--
-- It is important we can lookup by remote @peerAddr@; this way we can find if
-- the connection manager is already managing a connection towards that
-- @peerAddr@ and reuse the 'ConnectionState'.
--
type ConnectionManagerState peerAddr handle handleError version m
= Map peerAddr (MutableConnState peerAddr handle handleError version m)

connectionManagerStateToCounters
:: Map peerAddr (ConnectionState peerAddr handle handleError version m)
-> ConnectionManagerCounters
connectionManagerStateToCounters =
foldMap' connectionStateToCounters

-- | State of a connection.
--
data ConnectionState peerAddr handle handleError version m =
-- | Each outbound connections starts in this state.
ReservedOutboundState

-- | Each inbound connection starts in this state, outbound connection
-- reach this state once `connect` call returns.
--
-- note: the async handle is lazy, because it's passed with 'mfix'.
| UnnegotiatedState !Provenance
!(ConnectionId peerAddr)
(Async m ())

-- | @OutboundState Unidirectional@ state.
| OutboundUniState !(ConnectionId peerAddr) !(Async m ()) !handle

-- | Either @OutboundState Duplex@ or @OutboundState^\tau Duplex@.
| OutboundDupState !(ConnectionId peerAddr) !(Async m ()) !handle !TimeoutExpired

-- | Before connection is reset it is put in 'OutboundIdleState' for the
-- duration of 'outboundIdleTimeout'.
--
| OutboundIdleState !(ConnectionId peerAddr) !(Async m ()) !handle !DataFlow
| InboundIdleState !(ConnectionId peerAddr) !(Async m ()) !handle !DataFlow
| InboundState !(ConnectionId peerAddr) !(Async m ()) !handle !DataFlow
| DuplexState !(ConnectionId peerAddr) !(Async m ()) !handle
| TerminatingState !(ConnectionId peerAddr) !(Async m ()) !(Maybe handleError)
| TerminatedState !(Maybe handleError)


-- | Return 'True' for states in which the connection was already closed.
--
connectionTerminated :: ConnectionState peerAddr handle handleError version m
-> Bool
connectionTerminated TerminatingState {} = True
connectionTerminated TerminatedState {} = True
connectionTerminated _ = False


-- | Perform counting from an 'AbstractState'
Expand Down Expand Up @@ -349,76 +207,6 @@ connectionStateToCounters state =
outboundConn = ConnectionManagerCounters 0 0 0 0 1


instance ( Show peerAddr
, Show handleError
, MonadAsync m
)
=> Show (ConnectionState peerAddr handle handleError version m) where
show ReservedOutboundState = "ReservedOutboundState"
show (UnnegotiatedState pr connId connThread) =
concat ["UnnegotiatedState "
, show pr
, " "
, show connId
, " "
, show (asyncThreadId connThread)
]
show (OutboundUniState connId connThread _handle) =
concat [ "OutboundState Unidirectional "
, show connId
, " "
, show (asyncThreadId connThread)
]
show (OutboundDupState connId connThread _handle expired) =
concat [ "OutboundState "
, show connId
, " "
, show (asyncThreadId connThread)
, " "
, show expired
]
show (OutboundIdleState connId connThread _handle df) =
concat [ "OutboundIdleState "
, show connId
, " "
, show (asyncThreadId connThread)
, " "
, show df
]
show (InboundIdleState connId connThread _handle df) =
concat [ "InboundIdleState "
, show connId
, " "
, show (asyncThreadId connThread)
, " "
, show df
]
show (InboundState connId connThread _handle df) =
concat [ "InboundState "
, show connId
, " "
, show (asyncThreadId connThread)
, " "
, show df
]
show (DuplexState connId connThread _handle) =
concat [ "DuplexState "
, show connId
, " "
, show (asyncThreadId connThread)
]
show (TerminatingState connId connThread handleError) =
concat ([ "TerminatingState "
, show connId
, " "
, show (asyncThreadId connThread)
]
++ maybeToList ((' ' :) . show <$> handleError))
show (TerminatedState handleError) =
concat (["TerminatedState"]
++ maybeToList ((' ' :) . show <$> handleError))


getConnThread :: ConnectionState peerAddr handle handleError version m
-> Maybe (Async m ())
getConnThread ReservedOutboundState = Nothing
Expand Down Expand Up @@ -465,25 +253,6 @@ isInboundConn DuplexState {} = True
isInboundConn TerminatingState {} = False
isInboundConn TerminatedState {} = False


abstractState :: MaybeUnknown (ConnectionState muxMode peerAddr m a b) -> AbstractState
abstractState = \case
Unknown -> UnknownConnectionSt
Race s' -> go s'
Known s' -> go s'
where
go :: ConnectionState muxMode peerAddr m a b -> AbstractState
go ReservedOutboundState {} = ReservedOutboundSt
go (UnnegotiatedState pr _ _) = UnnegotiatedSt pr
go (OutboundUniState _ _ _) = OutboundUniSt
go (OutboundDupState _ _ _ te) = OutboundDupSt te
go (OutboundIdleState _ _ _ df) = OutboundIdleSt df
go (InboundIdleState _ _ _ df) = InboundIdleSt df
go (InboundState _ _ _ df) = InboundSt df
go DuplexState {} = DuplexSt
go TerminatingState {} = TerminatingSt
go TerminatedState {} = TerminatedSt

-- | The default value for 'timeWaitTimeout'.
--
defaultTimeWaitTimeout :: DiffTime
Expand Down
Loading