Skip to content

Latest commit

 

History

History

ViennaNET.Messaging

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Assembly with classes and interfaces providing a common functionality for working with queues


The principle of operation and use of the assembly

The assembly contains basic interfaces and implementations for working with abstract queues. Expands by adapters to secret queues in the form of separate assemblies. The assembly does not contain information about the specific queues used.


Sending and receiving messages

To use the library in your assembly, implementations of the IMessageProcessor interface must be defined in case of constant listening to the queue, as well as the implementation of the message classes necessary for your assemblies to work. To send messages and receive them in a targeted manner, you must implement the IMessagingComponentFactory dependency. This dependency is a component factory and returns objects that implement the IDisposable interface.

Example of receiving a message:

    using (var receiver = _messagingComponentFactory.CreateMessageReceiver<TestMessage>("testQueue"))
    {
        var message = receiver.Receive();
    }

Example of sending a message:

    using (var sender = _messagingComponentFactory.CreateMessageSender<TestMessage>("testQueue"))
    {
        sender.SendMessage(new TestMessage {Value = message});
    }

To implement listening on the queue, you must implement the IQueueReactorFactory dependency. In a class with an embedded dependency, you need to create an IQueueReactor instance using this factory and start listening, after registering IMessageProcessor message handlers in it:

    private readonly IQueueReactor _queueReactor;
    public TestClass (IQueueReactorFactory queueReactorFactory)
    {
      _queueReactorFactory.Register<TestMessageProcessor>("testQueue");
      _queueReactor = _queueReactorFactory.CreateQueueReactor("testQueue");
      _queueReactor.StartProcessing();
    }

Example of a message handler class and message class:

        public class TestMessageProcessor: IMessageProcessor
        {
            public bool Process(BaseMessage message)
            {
                var deserializer = new XmlMessageSerializer<TestMessage>();
                var msg = deserializer.Deserialize(message);
                Logger.LogDebug(msg.Value);
                return true;
            }
        }
        [Serializable]
        public class TestMessage
        {
            public string Value {get; set; }
        }

In your assembly that uses libraries for working with queues, you need to register the XmlMessageSerializer implementation of the IMessageSerializer and IMessageDeserializer interfaces for messages used in your business logic and implemented in your assemblies in the installer for the IoC container. Registration example:

public void RegisterServices(Container container)
{
  container.Collection.Append<IMessageSerializer, XmlMessageSerializer<TestMessage>>(Lifestyle.Singleton);
  container.Collection.Append<IMessageDeserializer, XmlMessageSerializer<TestMessage>>(Lifestyle.Singleton);
}




Assembly composition


Configuration


Factory
  • Interfaces:
    • IMessagingConfigurationFactory - Creates a configuration for working with queues.
  • Classes:
    • MessagingConfigurationFactory - Creates a configuration for working with queues.

Configuration sections
  • MessagingConfiguration - Configuration section. It contains the necessary information for the library. Section in the configuration file:
    "messaging": {
        "ApplicationName": <name of the application handling the queues>,
    }
  • QueueConfigurationBase - The base class of the configuration section. It contains the necessary information to work with all types of queues.

  • CustomHeader - Configuration section. Contains information on additional headers for working with queues in key-value format.  Section in the configuration file:

        "<queue type name>": [{
        ...
            "customHeaders": {
                "values": [
                    {"key": "<title name>", "value": "<title value>"}]
            }
        }

Enumerations Used in the Configuration
  • MessageProcessingType - type of processing for listening to the queue. Values:
    • ThreadStrategy - based on work in an infinite loop in one thread.
    • Subscribe - based on a subscription to queue events.
    • SubscribeAndReply - based on subscribing to queue events and the ability to respond.

Messages
  • BaseMessage - The main class of messages to send to the queue. Any messages containing data must be serialized in the Body field and deserialized from it.  Properties:

    • MessageId - message identifier.
    • CorrelationId - correlation identifier.
    • ApplicationTitle - the name of the application that sent the message.
    • SendDate - date and time of sending the message.
    • ReceiveDate - date and time of receipt of the message.
    • LifeTime - message lifetime, by default - indefinitely (TimeSpan.Zero).
    • Body - message body.
    • Properties - dictionary of additional message properties, by default - an empty dictionary.


Message serialization / deserialization

  • Interfaces:
    • IMessageSerializer / IMessageSerializer - Serializes the original message in the Body field of the BaseMessage instance and returns the BaseMessage instance.
    • IMessageDeserializer / IMessageDeserializer - Deserializes from the Body field of the BaseMessage instance to the message type intended for further use.
  • Classes:
    • PlainTextSerializer - serializer/deserializer for plain text
    • XmlMessageSerializer - xml-serializer/deserializer. It supports validation based on a data scheme - for this you need to set the value of the xsd field using the SetStream (Stream xsd) or AddStream (Stream xsd) methods.
    • XmlResourcesMessageSerializer - xml-serializer/deserealizer. It supports validation based on a data scheme - for this, you need to set the value of the xsd field using an implementation of a descendant of the ResourceStorage class.


Queue connection classes their classes generating them

  • Interfaces:

    • IMessageAdapter - Proxy to the queue.

      IMessageAdapter is implemented in separate assemblies for each individual queue type, since its specific implementations depend on libraries that provide queue APIs.

      • IMessageAdapterWithSubscribing - Proxy to the queue, in addition to the IMessageAdapter functionality, which provides the ability to subscribe client classes to queue events.

        IMessageAdapterWithSubscribing is implemented in separate assemblies for each individual queue type, since its specific implementations depend on the libraries that provide the queue API.

      • IMessageAdapterWithTransactions - Proxy to the queue, in addition to the IMessageAdapter functionality, which provides the possibility of transactional work with the queue.

        IMessageAdapterWithTransactions is implemented in separate assemblies for each individual queue type, since its specific implementations depend on the libraries that provide the queue API.

      • IMessageAdapterConstructor - Creates IMessageAdapter instances for this queue type.

        IMessageAdapterConstructor is implemented in separate assemblies for each individual queue type, since its specific implementations depend on the libraries that provide the queue API.

      • IMessageAdapterFactory - Creates instances IMessageAdapter by the queue name defined in the configuration file.

    • IMessageReceiver - Receive messages from the queue.

    • IMessageSender - Sends messages to the queue.

    • ISerializedMessageSender - Sends messages to the queue with pre-serialization.

    • IMessagingComponentFactory - Creates instances of senders and recipients of messages by the queue name defined in the configuration file.

  • Classes:

    • MessageAdapterFactory - Instance Factory IMessageAdapter. Creates them by the queue name defined in the configuration file. Contains a collection of IMessageAdapterConstructor instances that are referenced when the adapter is instantiated. > For one type of queue, only one IMessageAdapterConstructor implementation can be defined. Otherwise, a CantFindAdapterConstructorException will be thrown.

    • MessagingComponentFactory - Instance factory IMessageReceiver and IMessageSender. Creates them by the queue name defined in the configuration file. Contains collections of IMessageSerializer and IMessageDeserializer instances that are referenced when the component is instantiated. > Only one implementation of IMessageReceiver / ISerializedMessageSender messages can be defined for a single queue name. Otherwise, a CantFindSerializerOrDeserializerException will be thrown.

    • MessageReceiver - the class of the message recipient.

    • MessageSender - class of message sender.

    • SerializedMessageSender - the class of the sender of messages with serialization.



Listening to Queues

  • Interfaces:

    • IQueueReactor - A process in memory listening on a queue.
    • IQueueReactorFactory - Creates instances of IQueueReactor.
    • IPolling - Provides IQueueReactor operation due to the flow.
    • IMessageProcessorRegister - Registers the message handlers IMessageProcessor.
    • IMessageProcessor - Processes messages when they are received.

      IMessageProccessor implementations are defined in the assemblies where the message is directly processed.

  • Classes:

    • QueueReactorFactory - Creates IQueueReactor instances by the name of the queue based on the collections of IMessageConstructor and IMessageProcessor instances, and also registers IMessageProcessor handlers.

      Queues and types implementing IMessageProcessor are registered in the class. If you try to register a processor type already registered in the factory, a MessageProcessorAlreadyRegisterException will be thrown.

  • Features:

    • When the serviceHealthDependent configuration flag is set to true, the current IQueueReactor subscribes to events that are thrown by the root IHealthCheckingService diagnostic service. This allows you to quickly disconnect from the queue in case the service diagnostics fail.