diff --git a/decentralized-message-queue/src/DMQ/Configuration.hs b/decentralized-message-queue/src/DMQ/Configuration.hs index 92e4a496315..42aede3d859 100644 --- a/decentralized-message-queue/src/DMQ/Configuration.hs +++ b/decentralized-message-queue/src/DMQ/Configuration.hs @@ -214,6 +214,7 @@ mkDiffusionConfiguration , Diffusion.dcBulkChurnInterval = dmqcChurnInterval , Diffusion.dcMuxForkPolicy = Diffusion.noBindForkPolicy -- TODO: Make option flag for responderForkPolicy , Diffusion.dcLocalMuxForkPolicy = Diffusion.noBindForkPolicy -- TODO: Make option flag for responderForkPolicy + , Diffusion.dcEgressPollInterval = 0 -- TODO: Make option flag for egress poll interval } where hints = defaultHints { diff --git a/network-mux/bench/socket_read_write/Main.hs b/network-mux/bench/socket_read_write/Main.hs index e933ef93526..a07b58a25a6 100644 --- a/network-mux/bench/socket_read_write/Main.hs +++ b/network-mux/bench/socket_read_write/Main.hs @@ -201,11 +201,11 @@ startServerMany sndSizeV ad = forever $ do -- It will send streams of data over the 41 and 42 miniprotocol. -- Multiplexing is done with a separate thread running -- the Egress.muxer function. -startServerEgresss :: StrictTMVar IO Int64 -> Socket -> IO () -startServerEgresss sndSizeV ad = forever $ do +startServerEgresss :: DiffTime -> StrictTMVar IO Int64 -> Socket -> IO () +startServerEgresss pollInterval sndSizeV ad = forever $ do (sd, _) <- Socket.accept ad withReadBufferIO (\buffer -> do - bearer <-getBearer makeSocketBearer sduTimeout activeTracer sd buffer + bearer <- getBearer (makeSocketBearer' pollInterval) sduTimeout activeTracer sd buffer sndSize <- atomically $ takeTMVar sndSizeV eq <- atomically $ newTBQueue 100 w42 <- newTVarIO BL.empty @@ -273,25 +273,28 @@ main = do ad1 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol ad2 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol ad3 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol + ad4 <- Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol - return (ad1, ad2, ad3) + return (ad1, ad2, ad3, ad4) ) - (\(ad1, ad2, ad3) -> do + (\(ad1, ad2, ad3, ad4) -> do Socket.close ad1 Socket.close ad2 Socket.close ad3 + Socket.close ad4 ) - (\(ad1, ad2, ad3) -> do + (\(ad1, ad2, ad3, ad4) -> do sndSizeV <- newEmptyTMVarIO sndSizeMV <- newEmptyTMVarIO sndSizeEV <- newEmptyTMVarIO addr <- setupServer ad1 addrM <- setupServer ad2 addrE <- setupServer ad3 + addrF <- setupServer ad4 withAsync (startServer sndSizeV ad1) $ \said -> do withAsync (startServerMany sndSizeMV ad2) $ \saidM -> do - withAsync (startServerEgresss sndSizeEV ad3) $ \saidE -> do + withAsync (startServerEgresss 0.001 sndSizeEV ad3) $ \saidE -> withAsync (startServerEgresss 0 sndSizeEV ad4) $ \saidF -> do defaultMain [ -- Suggested Max SDU size for Socket bearer bench "Read/Write Benchmark 12288 byte SDUs" $ nfIO $ readBenchmark sndSizeV 12288 addr @@ -305,9 +308,13 @@ main = do , bench "Read/Write-Many Benchmark 914 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 914 addrM , bench "Read/Write-Many Benchmark 10 byte SDUs" $ nfIO $ readBenchmark sndSizeMV 10 addrM - -- Use standard muxer and demuxer - , bench "Read/Write Mux Benchmark 800+10 byte SDUs" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrE - , bench "Read/Write Mux Benchmark 12288+10 byte SDUs" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrE + -- Use standard muxer and demuxer, 1ms poll + , bench "Read/Write Mux Benchmark 800+10 byte SDUs, 1ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrE + , bench "Read/Write Mux Benchmark 12288+10 byte SDUs, 1ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrE + + -- Use standard muxer and demuxer, 0ms poll + , bench "Read/Write Mux Benchmark 800+10 byte SDUs, 0ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 800 addrF + , bench "Read/Write Mux Benchmark 12288+10 byte SDUs, 0ms Poll" $ nfIO $ readDemuxerBenchmark sndSizeEV 12288 addrF -- Use standard demuxer , bench "Read/Write Demuxer Queuing Benchmark 10 byte SDUs" $ nfIO $ readDemuxerQueueBenchmark sndSizeV 10 addr @@ -316,4 +323,5 @@ main = do cancel said cancel saidM cancel saidE + cancel saidF ) diff --git a/network-mux/src/Network/Mux/Bearer.hs b/network-mux/src/Network/Mux/Bearer.hs index b2a85e2ed48..b733f032b39 100644 --- a/network-mux/src/Network/Mux/Bearer.hs +++ b/network-mux/src/Network/Mux/Bearer.hs @@ -10,6 +10,7 @@ module Network.Mux.Bearer ( Bearer (..) , MakeBearer (..) , makeSocketBearer + , makeSocketBearer' , makePipeChannelBearer , makeQueueChannelBearer #if defined(mingw32_HOST_OS) @@ -61,8 +62,11 @@ pureBearer f = \sduTimeout rb tr fd -> pure (f sduTimeout rb tr fd) makeSocketBearer :: MakeBearer IO Socket -makeSocketBearer = MakeBearer $ (\sduTimeout tr fd rb -> do - return $ socketAsBearer size batch rb sduTimeout tr fd) +makeSocketBearer = makeSocketBearer' 0 + +makeSocketBearer' :: DiffTime -> MakeBearer IO Socket +makeSocketBearer' pt = MakeBearer $ (\sduTimeout tr fd rb -> do + return $ socketAsBearer size batch rb sduTimeout pt tr fd) where size = SDUSize 12_288 batch = 131_072 @@ -89,13 +93,13 @@ makeQueueChannelBearer :: ( MonadSTM m , MonadThrow m ) => MakeBearer m (QueueChannel m) -makeQueueChannelBearer = MakeBearer $ pureBearer (\_ tr q _-> queueChannelAsBearer size tr q) +makeQueueChannelBearer = MakeBearer $ pureBearer (\_ tr q _ -> queueChannelAsBearer size tr q) where size = SDUSize 1_280 #if defined(mingw32_HOST_OS) makeNamedPipeBearer :: MakeBearer IO HANDLE -makeNamedPipeBearer = MakeBearer $ pureBearer (\_ tr fd _-> namedPipeAsBearer size tr fd) +makeNamedPipeBearer = MakeBearer $ pureBearer (\_ tr fd _ -> namedPipeAsBearer size tr fd) where size = SDUSize 24_576 #endif diff --git a/network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs b/network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs index d1e756d88c6..710ca2f55b7 100644 --- a/network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs +++ b/network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs @@ -273,12 +273,13 @@ attenuationChannelAsBearer :: forall m. -> Bearer m attenuationChannelAsBearer sduSize sduTimeout muxTracer chan = Bearer { - read = readMux, - write = writeMux, - writeMany = writeMuxMany, + read = readMux, + write = writeMux, + writeMany = writeMuxMany, sduSize, - batchSize = fromIntegral $ getSDUSize sduSize, - name = "attenuation-channel" + batchSize = fromIntegral $ getSDUSize sduSize, + name = "attenuation-channel", + egressInterval = 0 } where readMux :: TimeoutFn m -> m (SDU, Time) diff --git a/network-mux/src/Network/Mux/Bearer/NamedPipe.hs b/network-mux/src/Network/Mux/Bearer/NamedPipe.hs index 30753531844..9e3a2da55b7 100644 --- a/network-mux/src/Network/Mux/Bearer/NamedPipe.hs +++ b/network-mux/src/Network/Mux/Bearer/NamedPipe.hs @@ -33,12 +33,13 @@ namedPipeAsBearer :: Mx.SDUSize -> Mx.Bearer IO namedPipeAsBearer sduSize tracer h = Mx.Bearer { - Mx.read = readNamedPipe, - Mx.write = writeNamedPipe, - Mx.writeMany = writeNamedPipeMany, - Mx.sduSize = sduSize, - Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize, - Mx.name = "named-pipe" + Mx.read = readNamedPipe, + Mx.write = writeNamedPipe, + Mx.writeMany = writeNamedPipeMany, + Mx.sduSize = sduSize, + Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize, + Mx.name = "named-pipe", + Mx.egressInterval = 0 } where readNamedPipe :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time) diff --git a/network-mux/src/Network/Mux/Bearer/Pipe.hs b/network-mux/src/Network/Mux/Bearer/Pipe.hs index 174d0f302fb..6f2796b23e4 100644 --- a/network-mux/src/Network/Mux/Bearer/Pipe.hs +++ b/network-mux/src/Network/Mux/Bearer/Pipe.hs @@ -75,12 +75,13 @@ pipeAsBearer -> Bearer IO pipeAsBearer sduSize tracer channel = Mx.Bearer { - Mx.read = readPipe, - Mx.write = writePipe, - Mx.writeMany = writePipeMany, - Mx.sduSize = sduSize, - Mx.name = "pipe", - Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize + Mx.read = readPipe, + Mx.write = writePipe, + Mx.writeMany = writePipeMany, + Mx.sduSize = sduSize, + Mx.name = "pipe", + Mx.batchSize = fromIntegral $ Mx.getSDUSize sduSize, + Mx.egressInterval = 0 } where readPipe :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time) diff --git a/network-mux/src/Network/Mux/Bearer/Queues.hs b/network-mux/src/Network/Mux/Bearer/Queues.hs index 29c7769eef1..e41bf27f3c9 100644 --- a/network-mux/src/Network/Mux/Bearer/Queues.hs +++ b/network-mux/src/Network/Mux/Bearer/Queues.hs @@ -40,12 +40,13 @@ queueChannelAsBearer -> Bearer m queueChannelAsBearer sduSize tracer QueueChannel { writeQueue, readQueue } = do Mx.Bearer { - Mx.read = readMux, - Mx.write = writeMux, - Mx.writeMany = writeMuxMany, - Mx.sduSize = sduSize, - Mx.batchSize = 2 * (fromIntegral $ Mx.getSDUSize sduSize), - Mx.name = "queue-channel" + Mx.read = readMux, + Mx.write = writeMux, + Mx.writeMany = writeMuxMany, + Mx.sduSize = sduSize, + Mx.batchSize = 2 * (fromIntegral $ Mx.getSDUSize sduSize), + Mx.name = "queue-channel", + Mx.egressInterval = 0 } where readMux :: Mx.TimeoutFn m -> m (Mx.SDU, Time) diff --git a/network-mux/src/Network/Mux/Bearer/Socket.hs b/network-mux/src/Network/Mux/Bearer/Socket.hs index 2b96d37f697..ecd26842fa5 100644 --- a/network-mux/src/Network/Mux/Bearer/Socket.hs +++ b/network-mux/src/Network/Mux/Bearer/Socket.hs @@ -52,17 +52,19 @@ socketAsBearer -> Int -> Maybe (Mx.ReadBuffer IO) -> DiffTime + -> DiffTime -> Tracer IO Mx.Trace -> Socket.Socket -> Bearer IO -socketAsBearer sduSize batchSize readBuffer_m sduTimeout tracer sd = +socketAsBearer sduSize batchSize readBuffer_m sduTimeout pollInterval tracer sd = Mx.Bearer { - Mx.read = readSocket, - Mx.write = writeSocket, - Mx.writeMany = writeSocketMany, - Mx.sduSize = sduSize, - Mx.batchSize = batchSize, - Mx.name = "socket-bearer" + Mx.read = readSocket, + Mx.write = writeSocket, + Mx.writeMany = writeSocketMany, + Mx.sduSize = sduSize, + Mx.batchSize = batchSize, + Mx.name = "socket-bearer", + Mx.egressInterval = pollInterval } where readSocket :: Mx.TimeoutFn IO -> IO (Mx.SDU, Time) diff --git a/network-mux/src/Network/Mux/Egress.hs b/network-mux/src/Network/Mux/Egress.hs index 1f0f614b441..9a6d64c7788 100644 --- a/network-mux/src/Network/Mux/Egress.hs +++ b/network-mux/src/Network/Mux/Egress.hs @@ -144,7 +144,7 @@ muxer => EgressQueue m -> Bearer m -> m void -muxer egressQueue Bearer { writeMany, sduSize, batchSize } = +muxer egressQueue Bearer { writeMany, sduSize, batchSize, egressInterval } = withTimeoutSerial $ \timeout -> forever $ do start <- getMonotonicTime @@ -156,12 +156,9 @@ muxer egressQueue Bearer { writeMany, sduSize, batchSize } = empty <- atomically $ isEmptyTBQueue egressQueue when (empty) $ do let delta = diffTime end start - threadDelay (loopInterval - delta) + threadDelay (egressInterval - delta) where - loopInterval :: DiffTime - loopInterval = 0.001 - maxSDUsPerBatch :: Int maxSDUsPerBatch = 100 diff --git a/network-mux/src/Network/Mux/Types.hs b/network-mux/src/Network/Mux/Types.hs index 9cf2e862a26..333de87688b 100644 --- a/network-mux/src/Network/Mux/Types.hs +++ b/network-mux/src/Network/Mux/Types.hs @@ -244,17 +244,19 @@ msHeaderLength = 8 -- data Bearer m = Bearer { -- | Timestamp and send SDU. - write :: TimeoutFn m -> SDU -> m Time + write :: TimeoutFn m -> SDU -> m Time -- | Timestamp and send many SDUs. - , writeMany :: TimeoutFn m -> [SDU] -> m Time + , writeMany :: TimeoutFn m -> [SDU] -> m Time -- | Read a SDU - , read :: TimeoutFn m -> m (SDU, Time) + , read :: TimeoutFn m -> m (SDU, Time) -- | Return a suitable SDU payload size. - , sduSize :: SDUSize + , sduSize :: SDUSize -- | Return a suitable batch size - , batchSize :: Int + , batchSize :: Int -- | Name of the bearer - , name :: String + , name :: String + -- | Egress poll interval + , egressInterval :: DiffTime } newtype SDUSize = SDUSize { getSDUSize :: Word16 } diff --git a/network-mux/test/Test/Mux.hs b/network-mux/test/Test/Mux.hs index cc0ab27ff9b..8a93c815c42 100644 --- a/network-mux/test/Test/Mux.hs +++ b/network-mux/test/Test/Mux.hs @@ -919,7 +919,7 @@ runWithSocket cap clientBuf_m serverBuf_m initApps respApps = withIOManager (\io ) ) where - mkBearer buf_m sock tr = getBearer makeSocketBearer (-1) tr sock buf_m + mkBearer buf_m sock tr = getBearer (makeSocketBearer' 0.001) (-1) tr sock buf_m clientTracer = contramap (Mx.WithBearer "client") activeTracer serverTracer = contramap (Mx.WithBearer "server") activeTracer diff --git a/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/ConnectionManager.hs b/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/ConnectionManager.hs index 37100de8cf7..38b557b2aaa 100644 --- a/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/ConnectionManager.hs +++ b/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/ConnectionManager.hs @@ -347,12 +347,13 @@ makeFDBearer :: MonadDelay m => MakeBearer m (FD m) makeFDBearer = MakeBearer $ \_ _ _ _ -> return Mx.Bearer { - Mx.write = \_ _ -> getMonotonicTime, - Mx.writeMany = \_ _ -> getMonotonicTime, - Mx.read = \_ -> forever (threadDelay 3600), - Mx.sduSize = Mx.SDUSize 1500, - Mx.batchSize = 1500, - Mx.name = "FD" + Mx.write = \_ _ -> getMonotonicTime, + Mx.writeMany = \_ _ -> getMonotonicTime, + Mx.read = \_ -> forever (threadDelay 3600), + Mx.sduSize = Mx.SDUSize 1500, + Mx.batchSize = 1500, + Mx.name = "FD", + Mx.egressInterval = 0 } -- | We only keep exceptions here which should not be handled by the test diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Snocket.hs b/ouroboros-network-framework/src/Ouroboros/Network/Snocket.hs index 076fba7671f..fa6dd75a660 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Snocket.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Snocket.hs @@ -21,6 +21,7 @@ module Ouroboros.Network.Snocket , AddressFamily (..) , Snocket (..) , makeSocketBearer + , makeSocketBearer' , makeLocalRawBearer -- ** Socket based Snockets , SocketSnocket diff --git a/ouroboros-network/CHANGELOG.md b/ouroboros-network/CHANGELOG.md index 92ba3508d7d..6ed6d8abaed 100644 --- a/ouroboros-network/CHANGELOG.md +++ b/ouroboros-network/CHANGELOG.md @@ -37,6 +37,7 @@ - Renamed `Arguments` to `DiffusionConfiguration` - Renamed `Applications` to `DiffusionApplications` - `runM` function now receives `ExtraParameters` as an argument +- Configurable Mux Egress Poll Interval ## 0.20.1.0 -- 2025-03-13 diff --git a/ouroboros-network/cardano-diffusion/Cardano/Network/Diffusion.hs b/ouroboros-network/cardano-diffusion/Cardano/Network/Diffusion.hs index 00465462c42..445c34e9e6c 100644 --- a/ouroboros-network/cardano-diffusion/Cardano/Network/Diffusion.hs +++ b/ouroboros-network/cardano-diffusion/Cardano/Network/Diffusion.hs @@ -136,7 +136,7 @@ run lpci tracerChurnMode localConfig metrics tracers args apps = do (\e -> traceWith tracer (Diffusion.DiffusionErrored e) >> throwIO (Diffusion.DiffusionError e)) $ withIOManager $ \iocp -> do - interfaces <- Diffusion.mkInterfaces iocp tracer + interfaces <- Diffusion.mkInterfaces iocp tracer (Diffusion.dcEgressPollInterval args) Diffusion.runM interfaces tracers diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs index 73c22572557..fa215c1703e 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs @@ -78,7 +78,7 @@ import Ouroboros.Network.Protocol.Handshake import Ouroboros.Network.RethrowPolicy import Ouroboros.Network.Server qualified as Server import Ouroboros.Network.Snocket (LocalAddress, LocalSocket (..), - localSocketFileDescriptor, makeLocalBearer, makeSocketBearer) + localSocketFileDescriptor, makeLocalBearer, makeSocketBearer') import Ouroboros.Network.Snocket qualified as Snocket import Ouroboros.Network.Socket (configureSocket, configureSystemdSocket) @@ -870,7 +870,7 @@ run extraParams tracers args apps = do >> throwIO (DiffusionError e)) $ withIOManager $ \iocp -> do - interfaces <- mkInterfaces iocp tracer + interfaces <- mkInterfaces iocp tracer (dcEgressPollInterval args) runM interfaces tracers @@ -884,6 +884,7 @@ run extraParams tracers args apps = do mkInterfaces :: IOManager -> Tracer IO (DiffusionTracer ntnAddr ntcAddr) + -> DiffTime -> IO (Interfaces Socket RemoteAddress LocalSocket @@ -891,14 +892,17 @@ mkInterfaces :: IOManager Resolver IOException IO) -mkInterfaces iocp tracer = do +mkInterfaces iocp tracer egressPollInterval = do diRng <- newStdGen diConnStateIdSupply <- atomically $ CM.newConnStateIdSupply Proxy + -- Clamp the mux egress poll interval to sane values. + let egressInterval = max 0 $ min 0.200 egressPollInterval + return $ Interfaces { diNtnSnocket = Snocket.socketSnocket iocp, - diNtnBearer = makeSocketBearer, + diNtnBearer = makeSocketBearer' egressInterval, diWithBuffer = withReadBufferIO, diNtnConfigureSocket = configureSocket, diNtnConfigureSystemdSocket = diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/Types.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/Types.hs index e22fd7c7a2a..db03fcca4c2 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/Types.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/Types.hs @@ -467,6 +467,9 @@ data Configuration extraFlags m ntnFd ntnAddr ntcFd ntcAddr = Configuration { -- , dcLocalMuxForkPolicy :: Mx.ForkPolicy ntcAddr + -- | Mux egress queue's poll interval + , dcEgressPollInterval :: DiffTime + } diff --git a/ouroboros-network/testlib/Test/Ouroboros/Network/Diffusion/Node.hs b/ouroboros-network/testlib/Test/Ouroboros/Network/Diffusion/Node.hs index d4a4f69bcda..4d7154e26b3 100644 --- a/ouroboros-network/testlib/Test/Ouroboros/Network/Diffusion/Node.hs +++ b/ouroboros-network/testlib/Test/Ouroboros/Network/Diffusion/Node.hs @@ -475,6 +475,7 @@ run blockGeneratorArgs limits ni na , Diffusion.dcReadLedgerPeerSnapshot = pure Nothing -- ^ tested independently , Diffusion.dcMuxForkPolicy = noBindForkPolicy , Diffusion.dcLocalMuxForkPolicy = noBindForkPolicy + , Diffusion.dcEgressPollInterval = 0.001 } appArgs :: PeerMetrics m NtNAddr