Skip to content

Commit

Permalink
ARTEMIS-4873 Add an example showing AMQP federation in pull mode
Browse files Browse the repository at this point in the history
Show how to configure a policy to only pull a batch of messages when there
is no local pending messages awaiting consumption.
  • Loading branch information
tabish121 committed Jun 27, 2024
1 parent 0c29740 commit 7c18a4c
Show file tree
Hide file tree
Showing 7 changed files with 495 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.activemq.examples.broker-connection</groupId>
<artifactId>broker-connections</artifactId>
<version>2.36.0-SNAPSHOT</version>
</parent>

<artifactId>amqp-federation-queue-pull-messages</artifactId>
<packaging>jar</packaging>
<name>amqp-federation Queue Federation with pull configuration</name>

<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server1</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/activemq/server1</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>create2</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server2</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/activemq/server2</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<!-- we first start broker 1, to avoid reconnecting statements -->
<execution>
<id>start1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server1</location>
<testURI>tcp://localhost:5660</testURI>
<args>
<param>run</param>
</args>
<name>server1</name>
</configuration>
</execution>
<execution>
<id>start2</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<spawn>true</spawn>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server2</location>
<testURI>tcp://localhost:5770</testURI>
<args>
<param>run</param>
</args>
<name>server2</name>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<!-- you may have to set export MAVEN_OPTS="-Djava.net.preferIPv4Stack=true"
if you are on MacOS for instance -->
<clientClass>org.apache.activemq.artemis.jms.example.BrokerFederationExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop2</id>
<goals>
<goal>stop</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server2</location>
</configuration>
</execution>
<execution>
<id>stop1</id>
<goals>
<goal>stop</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server1</location>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.broker-connection</groupId>
<artifactId>amqp-federation-queue-pull-messages</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# AMQP Broker Connection example with Federation of Queue messages using pull mode

If you have not already done so, [prepare the broker distribution](../../../../README.md#getting-started) before running the example.

To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually.

This example demonstrates the use of queue federation with configuration to only pull messages from one broker to another when there is no backlog on the broker pulling messages
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.jms.JmsConnectionFactory;

/**
* This example is demonstrating how messages are federated two brokers with Queue federation
* configured such that messages are pulled only when a queue on the target broker has no pending
* messages with a batch size of 1.
*/
public class BrokerFederationExample {

public static void main(final String[] args) throws Exception {
// Consumers are pull consumers so message stay on the server until a receive call is made.
final ConnectionFactory connectionFactoryServer1 = new JmsConnectionFactory("amqp://localhost:5660?jms.prefetchPolicy.all=0");
final ConnectionFactory connectionFactoryServer2 = new JmsConnectionFactory("amqp://localhost:5770?jms.prefetchPolicy.all=0");

final Connection connectionOnServer1 = connectionFactoryServer1.createConnection();
final Connection connectionOnServer2 = connectionFactoryServer2.createConnection();

connectionOnServer1.start();
connectionOnServer2.start();

final Session sessionOnServer1 = connectionOnServer1.createSession(Session.AUTO_ACKNOWLEDGE);
final Session sessionOnServer2 = connectionOnServer2.createSession(Session.AUTO_ACKNOWLEDGE);

final Queue trackingQueue = sessionOnServer2.createQueue("tracking");

final MessageConsumer consumerOn1 = sessionOnServer1.createConsumer(trackingQueue);
final MessageProducer producerOn1 = sessionOnServer1.createProducer(trackingQueue);
final MessageProducer producerOn2 = sessionOnServer2.createProducer(trackingQueue);

Thread.sleep(3_000); // Allow federation connections to build

final TextMessage messageSent1 = sessionOnServer2.createTextMessage("message #1");
final TextMessage messageSent2 = sessionOnServer2.createTextMessage("message #2");
final TextMessage messageSent3 = sessionOnServer2.createTextMessage("message #3");

producerOn2.send(messageSent1); // Should get pulled to server 1
producerOn2.send(messageSent2);
producerOn2.send(messageSent3);

final MessageConsumer consumerOn2 = sessionOnServer2.createConsumer(trackingQueue);

// These messages sent to server 2 should stay there as server 1 now has backlog
final TextMessage receivedFromC2 = (TextMessage) consumerOn2.receive(10_000);
final TextMessage receivedFromC3 = (TextMessage) consumerOn2.receive(10_000);

System.out.println("Consumer on server 2 received message: " + receivedFromC2.getText());
System.out.println("Consumer on server 2 received message: " + receivedFromC3.getText());

final TextMessage receivedFromC1 = (TextMessage) consumerOn1.receive(10_000);

System.out.println("Consumer on server 1 received message: " + receivedFromC1.getText());

// Now create local backlog on server 1 which should prevent any messages being federated
// from server 2 and the local consumer on server 2 should be able to receive them

final TextMessage messageSent4 = sessionOnServer1.createTextMessage("message #4");
final TextMessage messageSent5 = sessionOnServer1.createTextMessage("message #5");
final TextMessage messageSent6 = sessionOnServer1.createTextMessage("message #6");

producerOn1.send(messageSent4);
producerOn1.send(messageSent5);
producerOn1.send(messageSent6);

// These should stay local to server 2 and be consumed when pulled by the consumer on 2

final TextMessage messageSent7 = sessionOnServer2.createTextMessage("message #7");
final TextMessage messageSent8 = sessionOnServer2.createTextMessage("message #8");
final TextMessage messageSent9 = sessionOnServer2.createTextMessage("message #9");

producerOn2.send(messageSent7);
producerOn2.send(messageSent8);
producerOn2.send(messageSent9);

final TextMessage receivedFromC7 = (TextMessage) consumerOn2.receive(10_000);
final TextMessage receivedFromC8 = (TextMessage) consumerOn2.receive(10_000);
final TextMessage receivedFromC9 = (TextMessage) consumerOn2.receive(10_000);

System.out.println("Consumer on server 2 received message: " + receivedFromC7.getText());
System.out.println("Consumer on server 2 received message: " + receivedFromC8.getText());
System.out.println("Consumer on server 2 received message: " + receivedFromC9.getText());

connectionOnServer1.close();
connectionOnServer2.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">

<name>Server1</name>

<persistence-enabled>false</persistence-enabled>

<journal-type>NIO</journal-type>

<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>

<critical-analyzer-timeout>120000</critical-analyzer-timeout>

<critical-analyzer-check-period>60000</critical-analyzer-check-period>

<critical-analyzer-policy>HALT</critical-analyzer-policy>

<page-sync-timeout>44000</page-sync-timeout>

<acceptors>
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:5660?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
</acceptors>

<broker-connections>
<amqp-connection uri="tcp://localhost:5770" name="federation-example-server-1" retry-interval="1000">
<federation>
<local-queue-policy name="queue-federation-from-server-2">
<include address-match="#" queue-match="tracking" />
<property key="amqpCredits" value="0" />
<property key="amqpPullConsumerCredits" value="1" />
</local-queue-policy>
</federation>
</amqp-connection>
</broker-connections>

<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>

<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
</address-settings>

<addresses>
<address name="tracking">
<anycast>
<queue name="tracking" />
</anycast>
</address>
</addresses>

</core>
</configuration>
Loading

0 comments on commit 7c18a4c

Please sign in to comment.