Skip to content

Commit

Permalink
fixup readbuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
karknu committed Mar 10, 2025
1 parent 71993fb commit d467d9c
Showing 1 changed file with 35 additions and 36 deletions.
71 changes: 35 additions & 36 deletions network-mux/src/Network/Mux/Bearer/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ socketAsBearer sduSize batchSize readBuffer_m sduTimeout tracer sd =

case readBuffer_m of
Nothing -> -- No read buffer available; read directly from socket
recvFromSocket l
recvFromSocket waitingOnNxtHeader l
Just Mx.ReadBuffer{..} -> do
availableData <- atomically $ do
buf <- readTVar rbVar
Expand All @@ -127,54 +127,53 @@ socketAsBearer sduSize batchSize readBuffer_m sduTimeout tracer sd =
-- Don't let the kernel wake us up until there is
-- at least l bytes of data.
Socket.setSocketOption sd Socket.RecvLowWater $ fromIntegral l
newBuf <- recvFromSocket $ fromIntegral rbSize
newBuf <- recvFromSocket waitingOnNxtHeader $ fromIntegral rbSize
atomically $ modifyTVar rbVar (`BL.append` newBuf)
when (not waitingOnNxtHeader) $
Socket.setSocketOption sd Socket.RecvLowWater 1
recvAtMost waitingOnNxtHeader l
else do
traceWith tracer $ Mx.TraceRecvEnd $ fromIntegral $ BL.length availableData
return availableData
where
#if !defined(mingw32_HOST_OS)
-- Read at most `min rbSize maxLen` bytes from the socket
-- into rbBuf.
-- Creates and returns a Bytestring matching the exact size
-- of the number of bytes read.
recvBuf :: Mx.ReadBuffer IO -> Int64 -> IO BL.ByteString
recvBuf Mx.ReadBuffer{..} maxLen = do
len <- Socket.recvBuf sd rbBuf (min rbSize $ fromIntegral maxLen)
traceWith tracer $ Mx.TraceRecvRaw len
if len > 0
then do
bs <- create len (\dest -> copyBytes dest rbBuf len)
return $ BL.fromStrict bs
else return $ BL.empty
-- Read at most `min rbSize maxLen` bytes from the socket
-- into rbBuf.
-- Creates and returns a Bytestring matching the exact size
-- of the number of bytes read.
recvBuf :: Mx.ReadBuffer IO -> Int64 -> IO BL.ByteString
recvBuf Mx.ReadBuffer{..} maxLen = do
len <- Socket.recvBuf sd rbBuf (min rbSize $ fromIntegral maxLen)
traceWith tracer $ Mx.TraceRecvRaw len
if len > 0
then do
bs <- create len (\dest -> copyBytes dest rbBuf len)
return $ BL.fromStrict bs
else return $ BL.empty
#endif

recvFromSocket :: Int64 -> IO BL.ByteString
recvFromSocket len = do
recvFromSocket :: Bool -> Int64 -> IO BL.ByteString
recvFromSocket waitingOnNxtHeader len = do
#if defined(mingw32_HOST_OS)
buf <- Win32.Async.recv sd (fromIntegral len)
buf <- Win32.Async.recv sd (fromIntegral len)
#else
buf <- (case readBuffer_m of
Nothing -> Socket.recv sd len
Just readBuffer -> recvBuf readBuffer len
)
buf <- (case readBuffer_m of
Nothing -> Socket.recv sd len
Just readBuffer -> recvBuf readBuffer len
)
#endif
`catch` Mx.handleIOException "recv errored"
if BL.null buf
then do
when waitingOnNxtHeader $
{- This may not be an error, but could be an orderly shutdown.
- We wait 1 seconds to give the mux protocols time to perform
- a clean up and exit.
-}
threadDelay 1
throwIO $ Mx.BearerClosed (show sd ++
" closed when reading data, waiting on next header " ++
show waitingOnNxtHeader)
else return buf
`catch` Mx.handleIOException "recv errored"
if BL.null buf
then do
when waitingOnNxtHeader $
{- This may not be an error, but could be an orderly shutdown.
- We wait 1 seconds to give the mux protocols time to perform
- a clean up and exit.
-}
threadDelay 1
throwIO $ Mx.BearerClosed (show sd ++
" closed when reading data, waiting on next header " ++
show waitingOnNxtHeader)
else return buf

writeSocket :: Mx.TimeoutFn IO -> Mx.SDU -> IO Time
writeSocket timeout sdu = do
Expand Down

0 comments on commit d467d9c

Please sign in to comment.