Skip to content

Commit

Permalink
Check that messages are well-formed before enqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
edsko committed Oct 20, 2024
1 parent 7a9648c commit e34ee99
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 16 deletions.
2 changes: 2 additions & 0 deletions grapesy/grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ test-suite test-grapesy
, bytestring >= 0.10 && < 0.13
, case-insensitive >= 1.2 && < 1.3
, containers >= 0.6 && < 0.8
, deepseq >= 1.4 && < 1.6
, exceptions >= 0.10 && < 0.11
, http-types >= 0.12 && < 0.13
, http2 >= 5.3.4 && < 5.4
Expand Down Expand Up @@ -636,6 +637,7 @@ benchmark grapesy-kvstore
, base64-bytestring >= 1.2 && < 1.3
, bytestring >= 0.10 && < 0.13
, containers >= 0.6 && < 0.8
, deepseq >= 1.4 && < 1.6
, hashable >= 1.3 && < 1.5
, optparse-applicative >= 0.16 && < 0.19
, proto-lens-runtime >= 0.7 && < 0.8
Expand Down
4 changes: 3 additions & 1 deletion grapesy/kvstore/KVStore/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module KVStore.API (
, KVStore(..)
) where

import Control.DeepSeq (NFData)
import Control.Monad
import Data.Aeson.Types qualified as Aeson
import Data.ByteString (ByteString)
Expand All @@ -25,13 +26,14 @@ newtype Key = Key {
getKey :: ByteString
}
deriving stock (Show, Eq, Ord)
deriving newtype (Hashable)
deriving newtype (Hashable, NFData)
deriving (ToJSON, FromJSON) via Base64

newtype Value = Value {
getValue :: ByteString
}
deriving stock (Show, Eq, Ord)
deriving newtype (NFData)
deriving (ToJSON, FromJSON) via Base64

{-------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion grapesy/src/Network/GRPC/Common/NextElem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import Network.GRPC.Common.StreamElem (StreamElem(..))
-- | Is there a next element in a stream?
--
-- Does not record metadata, unlike 'Network.GRPC.Common.StreamElem.StreamElem'.
data NextElem a = NoNextElem | NextElem a
data NextElem a = NoNextElem | NextElem !a
deriving stock (Show, Eq, Functor, Foldable, Traversable)

{-------------------------------------------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions grapesy/src/Network/GRPC/Common/StreamElem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ data StreamElem b a =
--
-- In this case, this element is /not/ final (and the final element, when
-- we receive it, will be tagged as 'Final').
StreamElem a
StreamElem !a

-- | We received the final element
--
-- The final element is annotated with some additional information.
| FinalElem a b
| FinalElem !a !b

-- | There are no more elements
--
Expand All @@ -59,7 +59,7 @@ data StreamElem b a =
-- * The stream didn't contain any elements at all.
-- * The final element was not marked as final.
-- See 'StreamElem' for detailed additional discussion.
| NoMoreElems b
| NoMoreElems !b
deriving stock (Show, Eq, Functor, Foldable, Traversable)

instance Bifunctor StreamElem where
Expand Down
5 changes: 4 additions & 1 deletion grapesy/src/Network/GRPC/Spec/MessageMeta.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ module Network.GRPC.Spec.MessageMeta (
, InboundMeta(..)
) where

import Control.DeepSeq (NFData)
import Data.Default
import Data.Word
import GHC.Generics (Generic)

{-------------------------------------------------------------------------------
Outbound messages
Expand All @@ -18,7 +20,8 @@ data OutboundMeta = OutboundMeta {
-- smaller message.
outboundEnableCompression :: Bool
}
deriving stock (Show)
deriving stock (Show, Generic)
deriving anyclass (NFData)

instance Default OutboundMeta where
def = OutboundMeta {
Expand Down
14 changes: 12 additions & 2 deletions grapesy/src/Network/GRPC/Spec/RPC.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Network.GRPC.Spec.RPC (
, defaultRpcContentType
) where

import Control.DeepSeq (NFData)
import Data.ByteString qualified as Strict (ByteString)
import Data.ByteString.Lazy qualified as Lazy
import Data.Kind
Expand Down Expand Up @@ -36,13 +37,22 @@ type family Output (rpc :: k) :: Type
-- We therefore punt on the encoding issue here, and use bytestrings. /If/
-- applications want to use non-ASCII characters, they can choose their own
-- encoding.
class ( -- Debug constraints
class ( -- Serialization
--
-- We force messages to NF before enqueueing them. This ensures that
-- if those messages contain any pure exceptions (due to a bug in a
-- client or a server), we detect the problem when the message is
-- enqueued, and can throw an appropriate exception.
NFData (Input rpc)
, NFData (Output rpc)

-- Debug constraints
--
-- For debugging it is useful when we have 'Show' instances in scope.
-- This is not that strong a requirement; after all, we must be able
-- to serialize inputs and deserialize outputs, so they must also be
-- 'Show'able.
Show (Input rpc)
, Show (Input rpc)
, Show (Output rpc)
, Show (RequestMetadata rpc)
, Show (ResponseInitialMetadata rpc)
Expand Down
18 changes: 16 additions & 2 deletions grapesy/src/Network/GRPC/Spec/RPC/JSON.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Network.GRPC.Spec.RPC.JSON (
, Optional(..)
) where

import Control.DeepSeq (NFData(..))
import Data.Aeson (ToJSON(..), FromJSON(..), (.=), (.:), (.:?))
import Data.Aeson qualified as Aeson
import Data.Aeson.Types qualified as Aeson
Expand Down Expand Up @@ -59,6 +60,10 @@ data JsonRpc (serv :: Symbol) (meth :: Symbol)
instance ( KnownSymbol serv
, KnownSymbol meth

-- Serialization
, NFData (Input (JsonRpc serv meth))
, NFData (Output (JsonRpc serv meth))

-- Debugging constraints
, Show (Input (JsonRpc serv meth))
, Show (Output (JsonRpc serv meth))
Expand Down Expand Up @@ -129,19 +134,28 @@ instance (Show x, Show (JsonObject fs))
. showString " :* "
. showsPrec 6 xs

instance NFData (JsonObject '[]) where
rnf JsonObject = ()

instance (NFData x, NFData (JsonObject fs))
=> NFData (JsonObject ('(f, x) : fs)) where
rnf (x :* xs) = rnf (x, xs)

-- | Required field
newtype Required a = Required {
getRequired :: a
}
deriving (Show)
deriving stock (Show)
deriving newtype (NFData)

-- | Optional field
--
-- 'Maybe' will be represented by the /absence/ of the field in the object.
newtype Optional a = Optional {
getOptional :: Maybe a
}
deriving (Show)
deriving stock (Show)
deriving newtype (NFData)

infixr 5 :*

Expand Down
8 changes: 8 additions & 0 deletions grapesy/src/Network/GRPC/Spec/RPC/Protobuf.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Network.GRPC.Spec.RPC.Protobuf (
, getProto
) where

import Control.DeepSeq (NFData)
import Control.Lens hiding (lens)
import Data.ByteString qualified as Strict (ByteString)
import Data.ByteString.Char8 qualified as BS.Char8
Expand Down Expand Up @@ -51,9 +52,15 @@ type instance Input (Protobuf serv meth) = Proto (MethodInput serv meth)
type instance Output (Protobuf serv meth) = Proto (MethodOutput serv meth)

instance ( HasMethodImpl serv meth

-- Debugging
, Show (MethodInput serv meth)
, Show (MethodOutput serv meth)

-- Serialization
, NFData (MethodInput serv meth)
, NFData (MethodOutput serv meth)

-- Metadata constraints
, Show (RequestMetadata (Protobuf serv meth))
, Show (ResponseInitialMetadata (Protobuf serv meth))
Expand Down Expand Up @@ -144,6 +151,7 @@ newtype Proto msg = Proto msg
, Enum
, FieldDefault
, MessageEnum
, NFData
)

-- | Field accessor for 'Proto'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Test.Sanity.StreamingType.CustomFormat (tests) where

import Codec.Serialise qualified as Cbor
import Control.Concurrent.Async (concurrently)
import Control.DeepSeq (NFData)
import Data.Bifunctor
import Data.ByteString qualified as Strict (ByteString)
import Data.Kind
Expand Down Expand Up @@ -59,6 +60,8 @@ data Function =
class ( Typeable fun
, Show (CalcInput fun)
, Show (CalcOutput fun)
, NFData (CalcInput fun)
, NFData (CalcOutput fun)
, Cbor.Serialise (CalcInput fun)
, Cbor.Serialise (CalcOutput fun)
) => CalculatorFunction (fun :: Function) where
Expand Down
17 changes: 11 additions & 6 deletions grapesy/util/Network/GRPC/Util/Session/Channel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ module Network.GRPC.Util.Session.Channel (
) where

import Control.Concurrent.STM
import Control.DeepSeq (NFData, force)
import Control.Exception
import Control.Monad
import Control.Monad.Catch (ExitCase(..))
import Data.Bifunctor
import Data.ByteString.Builder (Builder)
import Data.ByteString.Lazy qualified as BS.Lazy
import GHC.Stack

-- Doesn't really matter if we import from .Client or .Server
Expand All @@ -55,7 +57,6 @@ import Network.GRPC.Util.RedundantConstraint
import Network.GRPC.Util.Session.API
import Network.GRPC.Util.Thread
import Network.GRPC.Util.Parser qualified as Parser
import Data.ByteString.Lazy qualified as BS.Lazy

{-------------------------------------------------------------------------------
Definitions
Expand Down Expand Up @@ -237,15 +238,19 @@ getInboundHeaders Channel{channelInbound} =
-- which 'StreamElem.whenDefinitelyFinal' considers to be final). Doing so will
-- result in a 'SendAfterFinal' exception.
send :: forall sess.
HasCallStack
(HasCallStack, NFData (Message (Outbound sess)))
=> Channel sess
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO ()
send Channel{channelOutbound, channelSentFinal} msg =
withThreadInterface channelOutbound aux
send Channel{channelOutbound, channelSentFinal} = \msg -> do
msg' <- evaluate $ force <$> msg
withThreadInterface channelOutbound $ aux msg'
where
aux :: FlowState (Outbound sess) -> STM ()
aux st = do
aux ::
StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> FlowState (Outbound sess)
-> STM ()
aux msg st = do
-- By checking that we haven't sent the final message yet, we know that
-- this call to 'putMVar' will not block indefinitely: the thread that
-- sends messages to the peer will get to it eventually (unless it dies,
Expand Down

0 comments on commit e34ee99

Please sign in to comment.