Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Hubs Samples #4053

Merged
merged 8 commits into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion eng/spotbugs-aggregate-report/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
<source>..\..\core\azure-core-auth\src\main\java</source>
<source>..\..\core\azure-core-management\src\main\java</source>
<source>..\..\core\azure-core-test\src\main\java</source>
<source>..\..\eventhubs\client\src\main\java</source>
<source>..\..\eventhubs\client\azure-eventhubs\src\main\java</source>
<source>..\..\eventhubs\client\azure-eventhubs\src\samples\java</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;

import java.util.concurrent.Semaphore;

/**
* Demonstrates how to fetch metadata from an Event Hub's partitions.
*/
public class GetEventHubMetadata {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(1);

// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";

// Instantiate a client that will be used to call the service.
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.build();

// Acquiring the semaphore so that this sample does not end before all the partition properties are fetched.
semaphore.acquire();

// Querying the partition identifiers for the Event Hub. Then calling client.getPartitionProperties with the
// identifier to get information about each partition.
client.getPartitionIds().flatMap(partitionId -> client.getPartitionProperties(partitionId)).subscribe(properties -> {
System.out.println(String.format(
"Event Hub: %s, Partition Id: %s, Last Enqueued Sequence Number: %s, Last Enqueued Offset: %s",
properties.eventHubPath(), properties.id(), properties.lastEnqueuedSequenceNumber(),
properties.lastEnqueuedOffset()));
}, error -> {
System.err.println("Error occurred while fetching partition properties: " + error.toString());
}, () -> {
// Releasing the semaphore now that we've finished querying for partition properties.
semaphore.release();
});

System.out.println("Waiting for partition properties to complete...");
semaphore.acquire();
System.out.println("Finished.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumer;
import com.azure.messaging.eventhubs.EventHubProducer;
import com.azure.messaging.eventhubs.EventHubProducerOptions;
import com.azure.messaging.eventhubs.EventPosition;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Sample demonstrates how to receive events from an Azure Event Hub instance.
*/
public class ReceiveEvent {
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(30);
private static final int NUMBER_OF_EVENTS = 10;

public static void main(String[] args) throws InterruptedException, IOException {
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_EVENTS);

// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";

// Instantiate a client that will be used to call the service.
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.build();

// To create a consumer, we need to know what partition to connect to. We take the first partition id.
// .blockFirst() here is used to synchronously block until the first partition id is emitted. The maximum wait
// time is set by passing in the OPERATION_TIMEOUT value. If no item is emitted before the timeout elapses, a
// TimeoutException is thrown.
String firstPartition = client.getPartitionIds().blockFirst(OPERATION_TIMEOUT);

// Create a consumer.
// The "$Default" consumer group is created by default. This value can be found by going to the Event Hub
// instance you are connecting to, and selecting the "Consumer groups" page. EventPosition.latest() tells the
// service we only want events that are sent to the partition after we begin listening.
EventHubConsumer consumer = client.createConsumer(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
firstPartition, EventPosition.latest());

// We start receiving any events that come from `firstPartition`, print out the contents, and decrement the
// countDownLatch.
Disposable subscription = consumer.receive().subscribe(event -> {
String contents = UTF_8.decode(event.body()).toString();
System.out.println(String.format("[%s] Sequence Number: %s. Contents: %s", countDownLatch.getCount(),
event.offset(), contents));

countDownLatch.countDown();
});

// Because the consumer is only listening to new events, we need to send some events to `firstPartition`.
// This creates a producer that only sends events to `firstPartition`.
EventHubProducerOptions producerOptions = new EventHubProducerOptions().partitionId(firstPartition);
EventHubProducer producer = client.createProducer(producerOptions);

// We create 10 events to send to the service and block until the send has completed.
Flux.range(0, NUMBER_OF_EVENTS).flatMap(number -> {
String body = String.format("Hello world! Number: %s", number);
return producer.send(new EventData(body.getBytes(UTF_8)));
}).blockLast(OPERATION_TIMEOUT);

// We wait for all the events to be received before continuing.
countDownLatch.await(OPERATION_TIMEOUT.getSeconds(), TimeUnit.SECONDS);

// Dispose and close of all the resources we've created.
subscription.dispose();
producer.close();
consumer.close();
client.close();
}
}
63 changes: 63 additions & 0 deletions eventhubs/client/azure-eventhubs/src/samples/java/SendEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducer;

import java.io.IOException;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Sample demonstrates how to send a message to an Azure Event Hub.
*/
public class SendEvent {
public static void main(String[] args) {
// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";

// Instantiate a client that will be used to call the service.
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.build();

// Create a producer. This overload of `createProducer` does not accept any arguments. Consequently, events
// sent from this producer are load balanced between all available partitions in the Event Hub instance.
EventHubProducer producer = client.createProducer();

// Create an event to send.
EventData data = new EventData("Hello world!".getBytes(UTF_8));

// Send that event. This call returns a Mono<Void>, which we subscribe to. It completes successfully when the
// event has been delivered to the Event Hub. It completes with an error if an exception occurred while sending
// the event.
producer.send(data).subscribe(
(ignored) -> System.out.println("Event sent."),
error -> {
System.err.println("There was an error sending the event: " + error.toString());

if (error instanceof AmqpException) {
AmqpException amqpException = (AmqpException) error;

System.err.println(String.format("Is send operation retriable? %s. Error condition: %s",
amqpException.isTransient(), amqpException.getErrorCondition()));
}
}, () -> {
// Disposing of our producer and client.
try {
producer.close();
} catch (IOException e) {
System.err.println("Error encountered while closing producer: " + e.toString());
}

client.close();
});
}
}
1 change: 1 addition & 0 deletions pom.client.xml
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@
<additionalOptions>-maxLineLength 120
-snippetpath ${project.basedir}/applicationconfig/client/src/samples/java
-snippetpath ${project.basedir}/core/azure-core/src/samples/java
-snippetpath ${project.basedir}/eventhubs/client/azure-eventhubs/src/samples/java
-snippetpath ${project.basedir}/keyvault/client/keys/src/samples/java
-snippetpath ${project.basedir}/keyvault/client/secrets/src/samples/java
</additionalOptions>
Expand Down