Skip to content

Commit

Permalink
Event Hubs Samples (#4053)
Browse files Browse the repository at this point in the history
* Add hello world sample.

* Adding ReceiveEvent sample.

* Adding samples to the correct aggregate report.

* Adding sample for getting Event Hub metadata
  • Loading branch information
conniey authored and sima-zhu committed Jun 27, 2019
1 parent 5f19cc0 commit 7c5ac25
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 1 deletion.
3 changes: 2 additions & 1 deletion eng/spotbugs-aggregate-report/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
<source>..\..\core\azure-core-auth\src\main\java</source>
<source>..\..\core\azure-core-management\src\main\java</source>
<source>..\..\core\azure-core-test\src\main\java</source>
<source>..\..\eventhubs\client\src\main\java</source>
<source>..\..\eventhubs\client\azure-eventhubs\src\main\java</source>
<source>..\..\eventhubs\client\azure-eventhubs\src\samples\java</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;

import java.util.concurrent.Semaphore;

/**
* Demonstrates how to fetch metadata from an Event Hub's partitions.
*/
public class GetEventHubMetadata {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(1);

// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";

// Instantiate a client that will be used to call the service.
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.build();

// Acquiring the semaphore so that this sample does not end before all the partition properties are fetched.
semaphore.acquire();

// Querying the partition identifiers for the Event Hub. Then calling client.getPartitionProperties with the
// identifier to get information about each partition.
client.getPartitionIds().flatMap(partitionId -> client.getPartitionProperties(partitionId)).subscribe(properties -> {
System.out.println(String.format(
"Event Hub: %s, Partition Id: %s, Last Enqueued Sequence Number: %s, Last Enqueued Offset: %s",
properties.eventHubPath(), properties.id(), properties.lastEnqueuedSequenceNumber(),
properties.lastEnqueuedOffset()));
}, error -> {
System.err.println("Error occurred while fetching partition properties: " + error.toString());
}, () -> {
// Releasing the semaphore now that we've finished querying for partition properties.
semaphore.release();
});

System.out.println("Waiting for partition properties to complete...");
semaphore.acquire();
System.out.println("Finished.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumer;
import com.azure.messaging.eventhubs.EventHubProducer;
import com.azure.messaging.eventhubs.EventHubProducerOptions;
import com.azure.messaging.eventhubs.EventPosition;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Sample demonstrates how to receive events from an Azure Event Hub instance.
*/
public class ReceiveEvent {
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(30);
private static final int NUMBER_OF_EVENTS = 10;

public static void main(String[] args) throws InterruptedException, IOException {
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_EVENTS);

// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";

// Instantiate a client that will be used to call the service.
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.build();

// To create a consumer, we need to know what partition to connect to. We take the first partition id.
// .blockFirst() here is used to synchronously block until the first partition id is emitted. The maximum wait
// time is set by passing in the OPERATION_TIMEOUT value. If no item is emitted before the timeout elapses, a
// TimeoutException is thrown.
String firstPartition = client.getPartitionIds().blockFirst(OPERATION_TIMEOUT);

// Create a consumer.
// The "$Default" consumer group is created by default. This value can be found by going to the Event Hub
// instance you are connecting to, and selecting the "Consumer groups" page. EventPosition.latest() tells the
// service we only want events that are sent to the partition after we begin listening.
EventHubConsumer consumer = client.createConsumer(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
firstPartition, EventPosition.latest());

// We start receiving any events that come from `firstPartition`, print out the contents, and decrement the
// countDownLatch.
Disposable subscription = consumer.receive().subscribe(event -> {
String contents = UTF_8.decode(event.body()).toString();
System.out.println(String.format("[%s] Sequence Number: %s. Contents: %s", countDownLatch.getCount(),
event.offset(), contents));

countDownLatch.countDown();
});

// Because the consumer is only listening to new events, we need to send some events to `firstPartition`.
// This creates a producer that only sends events to `firstPartition`.
EventHubProducerOptions producerOptions = new EventHubProducerOptions().partitionId(firstPartition);
EventHubProducer producer = client.createProducer(producerOptions);

// We create 10 events to send to the service and block until the send has completed.
Flux.range(0, NUMBER_OF_EVENTS).flatMap(number -> {
String body = String.format("Hello world! Number: %s", number);
return producer.send(new EventData(body.getBytes(UTF_8)));
}).blockLast(OPERATION_TIMEOUT);

// We wait for all the events to be received before continuing.
countDownLatch.await(OPERATION_TIMEOUT.getSeconds(), TimeUnit.SECONDS);

// Dispose and close of all the resources we've created.
subscription.dispose();
producer.close();
consumer.close();
client.close();
}
}
63 changes: 63 additions & 0 deletions eventhubs/client/azure-eventhubs/src/samples/java/SendEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducer;

import java.io.IOException;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Sample demonstrates how to send a message to an Azure Event Hub.
*/
public class SendEvent {
public static void main(String[] args) {
// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";

// Instantiate a client that will be used to call the service.
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.build();

// Create a producer. This overload of `createProducer` does not accept any arguments. Consequently, events
// sent from this producer are load balanced between all available partitions in the Event Hub instance.
EventHubProducer producer = client.createProducer();

// Create an event to send.
EventData data = new EventData("Hello world!".getBytes(UTF_8));

// Send that event. This call returns a Mono<Void>, which we subscribe to. It completes successfully when the
// event has been delivered to the Event Hub. It completes with an error if an exception occurred while sending
// the event.
producer.send(data).subscribe(
(ignored) -> System.out.println("Event sent."),
error -> {
System.err.println("There was an error sending the event: " + error.toString());

if (error instanceof AmqpException) {
AmqpException amqpException = (AmqpException) error;

System.err.println(String.format("Is send operation retriable? %s. Error condition: %s",
amqpException.isTransient(), amqpException.getErrorCondition()));
}
}, () -> {
// Disposing of our producer and client.
try {
producer.close();
} catch (IOException e) {
System.err.println("Error encountered while closing producer: " + e.toString());
}

client.close();
});
}
}
1 change: 1 addition & 0 deletions pom.client.xml
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@
<additionalOptions>-maxLineLength 120
-snippetpath ${project.basedir}/applicationconfig/client/src/samples/java
-snippetpath ${project.basedir}/core/azure-core/src/samples/java
-snippetpath ${project.basedir}/eventhubs/client/azure-eventhubs/src/samples/java
-snippetpath ${project.basedir}/keyvault/client/keys/src/samples/java
-snippetpath ${project.basedir}/keyvault/client/secrets/src/samples/java
</additionalOptions>
Expand Down

0 comments on commit 7c5ac25

Please sign in to comment.