Skip to content

eigr/astreu4j

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

49 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Astreu4j

Astreu Java client

Maven Build Maven Release

Usage

Create Subscriber

package io.eigr.astreu.examples;

import io.eigr.astreu.Astreu;
import io.eigr.astreu.protocol.Exchange;
import io.eigr.astreu.subscriber.AcknowledgeContext;
import io.eigr.astreu.subscriber.MessageWithContext;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

class SubscriberExample {

    public static void main(final String[] args) {
        final Publisher<MessageWithContext> publisher =
                Astreu.at("127.0.0.1", 9980)
                        .asSub("test", "unique-subscription")

                        // You can use some filter options during receive messages
                        //.filter(msg -> true)
                        //.receiveOnly(MessageType.EXCHANGE)

                        //.bindWithThrottle(1, Duration.ofSeconds(1), 2);
                        .bind(); //This create a org.reactivestreams.Publisher

        // Then use with any Reactive Streams framework (build-in with Project Reactor or Akka)
        Flux.from(publisher).subscribe(messageWithContext -> {
            final AcknowledgeContext context = messageWithContext.getContext();

            //Messages can be of some types: [Exchange, Info, Failure]
            context.logger().debug("Message type is -> {}", messageWithContext.getType());
            // I am assuming it is an Exchange, but you should check this out before doing this here
            final Exchange message = messageWithContext.getMessage();

            context.logger().info("Incoming Message {}", message);

            /* Request / Response pattern is supported
            context.reply(
                    Exchange.newBuilder()
                            .setUuid(UUID.randomUUID().toString())
                            .setMessage(
                                    Any.newBuilder()
                                    .setTypeUrl("your.custom.package.type/YourTypeHere")
                                    .setValue(ByteString.copyFrom("Hello I got your message".getBytes()))
                                    .build())
                            .build());

            */

            // Or simply confirm or reject the message
            context.accept(); // Send acknowledge or reject message with context.reject()
        });
    }
}

Create Publisher

package io.eigr.astreu.examples;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import io.eigr.astreu.Astreu;
import io.eigr.astreu.Producer;
import io.eigr.astreu.publisher.ReplyMessage;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.UUID;

class ProducerExample {

    public static void main(final String[] args) {

        final Producer producer =
                Astreu.at("127.0.0.1", 9980)
                .asPub("test", UUID.randomUUID().toString().toLowerCase());

        final Publisher<ReplyMessage> publisher = producer.bind(); //This create a org.reactivestreams.Publisher

        // Then use with any Reactive Streams framework (build-in with Project Reactor or Akka)
        Flux.from(publisher).subscribe(replyMessage -> {
            //Messages can be of some types: [Ack, Exchange, Info, Failure]
            replyMessage.logger().info("Reply Message -> {}", replyMessage);
        });

        for (int i = 0; i < 500000; i++) {
            producer.publish(
                    String.valueOf(i), //id of a message or use producer.publish(any) For automatic creation of UUID-based ids
                    Any.newBuilder()
                            .setTypeUrl("io.astreu.custom/Text")
                            .setValue(ByteString.copyFrom(String.format("Hello World Astreu %s", i).getBytes()))
                            .build()
            );
        }
    }
}