Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xftp: fix permanent error reported as temporary from http client, wip #730

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/Simplex/FileTransfer/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ runXFTPSndPrepareWorker c doWork = do
else pure sndFile
maxRecipients <- asks (xftpMaxRecipientsPerRequest . config)
let numRecipients' = min numRecipients maxRecipients
liftIO $ print "finished encrypting"
threadDelay 10000000
-- concurrently?
forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients'
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
Expand Down Expand Up @@ -463,14 +465,17 @@ runXFTPSndWorker c srv doWork = do
`catchError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e
where
retryLoop loop e replicaDelay = do
liftIO $ print $ "replica " <> show sndChunkReplicaId <> " temporary error" <> show e
flip catchError (\_ -> pure ()) $ do
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
when notifyOnRetry $ notify c sndFileEntityId $ SFERR e
closeXFTPServerClient c userId server digest
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
atomically $ assertAgentForeground c
loop
retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
retryDone e = do
liftIO $ print $ "replica " <> show sndChunkReplicaId <> " permanent error" <> show e
sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m ()
uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do
replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica
Expand Down
35 changes: 23 additions & 12 deletions src/Simplex/FileTransfer/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

module Simplex.FileTransfer.Client where

import qualified Control.Exception as E
import Control.Monad.Except
import Data.Bifunctor (first)
import Data.ByteString.Builder (Builder, byteString)
Expand Down Expand Up @@ -135,23 +136,33 @@ sendXFTPCommand XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}}
xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd)
let req = H.requestStreaming N.methodPost "/" [] $ streamBody t
reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_
HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout
when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK
-- TODO validate that the file ID is the same as in the request?
(_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead
case respOrErr of
Right r -> case protocolError r of
Just e -> throwError $ PCEProtocolError e
_ -> pure (r, body)
Left e -> throwError $ PCEResponseError e
res <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout
case res of
HTTP2RequestResponse HTTP2Response {respBody = body@HTTP2Body {bodyHead}} -> do
when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK
-- TODO validate that the file ID is the same as in the request?
(_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead
case respOrErr of
Right r -> case protocolError r of
Just e -> throwError $ PCEProtocolError e
_ -> pure (r, body)
Left e -> throwError $ PCEResponseError e
HTTP2RequestError e -> do
liftIO $ print $ "in sendXFTPCommand HTTP2RequestError" <> show e
throwError $ PCEInternalError $ show e
where
streamBody :: ByteString -> (Builder -> IO ()) -> IO () -> IO ()
streamBody t send done = do
send $ byteString t
forM_ chunkSpec_ $ \XFTPChunkSpec {filePath, chunkOffset, chunkSize} ->
withFile filePath ReadMode $ \h -> do
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
sendFile h send $ fromIntegral chunkSize
do
( withFile filePath ReadMode $ \h -> do
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
sendFile h send $ fromIntegral chunkSize
)
`E.catch` \(e :: SomeException) -> do
print $ "in sendXFTPCommand streamBody " <> show e
E.throw e
done

createXFTPChunk ::
Expand Down
1 change: 1 addition & 0 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ protocolClientError protocolError_ host = \case
PCETransportError e -> BROKER host $ TRANSPORT e
e@PCECryptoError {} -> INTERNAL $ show e
PCEIOError {} -> BROKER host NETWORK
PCEInternalError e -> INTERNAL e

data ProtocolTestStep
= TSConnect
Expand Down
2 changes: 2 additions & 0 deletions src/Simplex/Messaging/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ data ProtocolClientError err
PCECryptoError C.CryptoError
| -- | IO Error
PCEIOError IOException
| -- | Internal error
PCEInternalError String
deriving (Eq, Show, Exception)

type SMPClientError = ProtocolClientError ErrorType
Expand Down
1 change: 1 addition & 0 deletions src/Simplex/Messaging/Notifications/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth
PCEProtocolError e -> updateErr "SMP error " e
PCEIOError e -> updateErr "IOError " e
PCEInternalError e -> updateErr "InternalError " e
PCEResponseError e -> updateErr "ResponseError " e
PCEUnexpectedResponse r -> updateErr "UnexpectedResponse " r
PCETransportError e -> updateErr "TransportError " e
Expand Down
13 changes: 8 additions & 5 deletions src/Simplex/Messaging/Notifications/Server/Push/APNS.hs
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,14 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknData {toke
apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn
req <- liftIO $ apnsRequest c tknStr apnsNtf
-- TODO when HTTP2 client is thread-safe, we can use sendRequestDirect
HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req Nothing
let status = H.responseStatus response
reason' = maybe "" reason $ J.decodeStrict' bodyHead
logDebug $ "APNS response: " <> T.pack (show status) <> " " <> reason'
result status reason'
res <- liftHTTPS2 $ sendRequest http2 req Nothing
case res of
HTTP2RequestResponse HTTP2Response {response, respBody = HTTP2Body {bodyHead}} -> do
let status = H.responseStatus response
reason' = maybe "" reason $ J.decodeStrict' bodyHead
logDebug $ "APNS response: " <> T.pack (show status) <> " " <> reason'
result status reason'
HTTP2RequestError _e -> throwError PPPermanentError
where
result :: Maybe Status -> Text -> ExceptT PushProviderError IO ()
result status reason'
Expand Down
12 changes: 11 additions & 1 deletion src/Simplex/Messaging/Transport/HTTP2.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Simplex.Messaging.Transport.HTTP2 where

Expand All @@ -23,7 +24,16 @@ defaultHTTP2BufferSize :: BufferSize
defaultHTTP2BufferSize = 32768

withHTTP2 :: BufferSize -> (Config -> SessionId -> IO a) -> TLS -> IO a
withHTTP2 sz run c = E.bracket (allocHTTP2Config c sz) freeSimpleConfig (`run` tlsUniq c)
withHTTP2 sz run c =
E.bracket
(allocHTTP2Config c sz)
freeSimpleConfig
( \cfg ->
run cfg (tlsUniq c) `E.catch` \(e :: E.SomeException) ->
do
print $ "withHTTP2 e: " <> show e
E.throwIO e
)

allocHTTP2Config :: TLS -> BufferSize -> IO Config
allocHTTP2Config c sz = do
Expand Down
44 changes: 27 additions & 17 deletions src/Simplex/Messaging/Transport/HTTP2/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import UnliftIO.STM
import UnliftIO.Timeout

data HTTP2Client = HTTP2Client
{ action :: Maybe (Async HTTP2Response),
{ action :: Maybe (Async HTTP2RequestResult),
sessionId :: SessionId,
sessionTs :: UTCTime,
sendReq :: Request -> (Response -> IO HTTP2Response) -> IO HTTP2Response,
sendReq :: Request -> (Response -> IO HTTP2RequestResult) -> IO HTTP2RequestResult,
client_ :: HClient
}

Expand All @@ -42,9 +42,13 @@ data HClient = HClient
host :: TransportHost,
port :: ServiceName,
config :: HTTP2ClientConfig,
reqQ :: TBQueue (Request, TMVar HTTP2Response)
reqQ :: TBQueue (Request, TMVar HTTP2RequestResult)
}

data HTTP2RequestResult
= HTTP2RequestResponse HTTP2Response
| HTTP2RequestError E.SomeException

data HTTP2Response = HTTP2Response
{ response :: Response,
respBody :: HTTP2Body
Expand Down Expand Up @@ -101,7 +105,7 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien
Just (Left e) -> Left e
Nothing -> Left HCNetworkError

client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client HTTP2Response
client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client HTTP2RequestResult
client c cVar sessionId sendReq = do
sessionTs <- getCurrentTime
let c' = HTTP2Client {action = Nothing, client_ = c, sendReq, sessionId, sessionTs}
Expand All @@ -110,28 +114,34 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien
putTMVar cVar (Right c')
process c' sendReq `E.finally` disconnected

process :: HTTP2Client -> H.Client HTTP2Response
process :: HTTP2Client -> H.Client HTTP2RequestResult
process HTTP2Client {client_ = HClient {reqQ}} sendReq = forever $ do
(req, respVar) <- atomically $ readTBQueue reqQ
sendReq req $ \r -> do
respBody <- getHTTP2Body r bodyHeadSize
let resp = HTTP2Response {response = r, respBody}
atomically $ putTMVar respVar resp
pure resp
(req, resVar) <- atomically $ readTBQueue reqQ
sendReq req (processResp resVar) `E.catch` \e -> do
liftIO $ print $ "in process catch e: " <> show e
let res = HTTP2RequestError e
atomically $ putTMVar resVar res
pure res
where
processResp resVar r = do
respBody <- getHTTP2Body r bodyHeadSize
let res = HTTP2RequestResponse $ HTTP2Response {response = r, respBody}
atomically $ putTMVar resVar res
pure res

-- | Disconnects client from the server and terminates client threads.
closeHTTP2Client :: HTTP2Client -> IO ()
closeHTTP2Client = mapM_ uninterruptibleCancel . action

sendRequest :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2Response)
sendRequest :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2RequestResult)
sendRequest HTTP2Client {client_ = HClient {config, reqQ}} req reqTimeout_ = do
resp <- newEmptyTMVarIO
atomically $ writeTBQueue reqQ (req, resp)
resVar <- newEmptyTMVarIO
atomically $ writeTBQueue reqQ (req, resVar)
let reqTimeout = http2RequestTimeout config reqTimeout_
maybe (Left HCResponseTimeout) Right <$> (reqTimeout `timeout` atomically (takeTMVar resp))
maybe (Left HCResponseTimeout) Right <$> (reqTimeout `timeout` atomically (takeTMVar resVar))

-- | this function should not be used until HTTP2 is thread safe, use sendRequest
sendRequestDirect :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2Response)
sendRequestDirect :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2RequestResult)
sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq} req reqTimeout_ = do
let reqTimeout = http2RequestTimeout config reqTimeout_
reqTimeout `timeout` try (sendReq req process) >>= \case
Expand All @@ -141,7 +151,7 @@ sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq
where
process r = do
respBody <- getHTTP2Body r $ bodyHeadSize config
pure HTTP2Response {response = r, respBody}
pure $ HTTP2RequestResponse $ HTTP2Response {response = r, respBody}

http2RequestTimeout :: HTTP2ClientConfig -> Maybe Int -> Int
http2RequestTimeout HTTP2ClientConfig {connTimeout} = maybe connTimeout (connTimeout +)
Expand Down