Skip to content

Implements a very easy to use sockets API based on IObservable

License

Notifications You must be signed in to change notification settings

mauroa/reactivesockets

 
 

Repository files navigation

reactivesockets

Implements a very easy to use sockets API based on IObservable. It allows very simple protocol implementations such as:

    var client = new ReactiveClient("127.0.0.1", 1055);
    
    // The parsing of messages is done in a simple Rx query over the receiver observable
    // Note this protocol has a fixed header part containing the payload message length
    // And the message payload itself. Bytes are consumed from the client.Receiver 
    // automatically so its behavior is intuitive.
    IObservable<string> messages = from header in client.Receiver.Buffer(4)
                                   let length = BitConverter.ToInt32(header.ToArray(), 0)
                                   let body = client.Receiver.Take(length)
                                   select Encoding.UTF8.GetString(body.ToEnumerable().ToArray());
    
    // Finally, we subscribe on a background thread to process messages when they are available
    messages.SubscribeOn(TaskPoolScheduler.Default).Subscribe(message => Console.WriteLine(message));
    client.ConnectAsync().Wait();

Creating the server implementation is equally straightforward (this is an echo server for the same message format):

        var server = new ReactiveListener(1055);
        server.Connections.Subscribe(socket =>
        {
            IObservable<string> messages = from header in socket.Receiver.Buffer(4)
                                           let length = BitConverter.ToInt32(header.ToArray(), 0)
                                           let body = socket.Receiver.Take(length)
                                           select Encoding.UTF8.GetString(body.ToEnumerable().ToArray());

            // Echo the incoming message with the same format.
            messages.Subscribe(message =>
            { 
              var body = encoding.GetBytes(message);
              var header = BitConverter.GetBytes(body.Length);
              var payload = header.Concat(body).ToArray();
              
              socket.SendAsync(payload).Wait();
            });
        });


        server.Start();

Install using: install-package reactivesockets

This library was inspired by this forum post and blog entry.

About

Implements a very easy to use sockets API based on IObservable

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • C# 98.5%
  • Other 1.5%