Skip to content

Latest commit

 

History

History
109 lines (82 loc) · 4.04 KB

README.md

File metadata and controls

109 lines (82 loc) · 4.04 KB

Astreu4j

Astreu Java client

Maven Build Maven Release

Usage

Create Subscriber

package io.eigr.astreu.examples;

import io.eigr.astreu.Astreu;
import io.eigr.astreu.protocol.Exchange;
import io.eigr.astreu.subscriber.AcknowledgeContext;
import io.eigr.astreu.subscriber.MessageWithContext;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

class SubscriberExample {

    public static void main(final String[] args) {
        final Publisher<MessageWithContext> publisher =
                Astreu.at("127.0.0.1", 9980)
                        .asSub("test", "unique-subscription")

                        // You can use some filter options during receive messages
                        //.filter(msg -> true)
                        //.receiveOnly(MessageType.EXCHANGE)

                        //.bindWithThrottle(1, Duration.ofSeconds(1), 2);
                        .bind(); //This create a org.reactivestreams.Publisher

        // Then use with any Reactive Streams framework (build-in with Project Reactor or Akka)
        Flux.from(publisher).subscribe(messageWithContext -> {
            final AcknowledgeContext context = messageWithContext.getContext();

            //Messages can be of some types: [Exchange, Info, Failure]
            context.logger().debug("Message type is -> {}", messageWithContext.getType());
            // I am assuming it is an Exchange, but you should check this out before doing this here
            final Exchange message = messageWithContext.getMessage();

            context.logger().info("Incoming Message {}", message);

            /* Request / Response pattern is supported
            context.reply(
                    Exchange.newBuilder()
                            .setUuid(UUID.randomUUID().toString())
                            .setMessage(
                                    Any.newBuilder()
                                    .setTypeUrl("your.custom.package.type/YourTypeHere")
                                    .setValue(ByteString.copyFrom("Hello I got your message".getBytes()))
                                    .build())
                            .build());

            */

            // Or simply confirm or reject the message
            context.accept(); // Send acknowledge or reject message with context.reject()
        });
    }
}

Create Publisher

package io.eigr.astreu.examples;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import io.eigr.astreu.Astreu;
import io.eigr.astreu.Producer;
import io.eigr.astreu.publisher.ReplyMessage;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.UUID;

class ProducerExample {

    public static void main(final String[] args) {

        final Producer producer =
                Astreu.at("127.0.0.1", 9980)
                .asPub("test", UUID.randomUUID().toString().toLowerCase());

        final Publisher<ReplyMessage> publisher = producer.bind(); //This create a org.reactivestreams.Publisher

        // Then use with any Reactive Streams framework (build-in with Project Reactor or Akka)
        Flux.from(publisher).subscribe(replyMessage -> {
            //Messages can be of some types: [Ack, Exchange, Info, Failure]
            replyMessage.logger().info("Reply Message -> {}", replyMessage);
        });

        for (int i = 0; i < 500000; i++) {
            producer.publish(
                    String.valueOf(i), //id of a message or use producer.publish(any) For automatic creation of UUID-based ids
                    Any.newBuilder()
                            .setTypeUrl("io.astreu.custom/Text")
                            .setValue(ByteString.copyFrom(String.format("Hello World Astreu %s", i).getBytes()))
                            .build()
            );
        }
    }
}