Skip to content

Latest commit

 

History

History
 
 

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
page_type languages products description urlFragment
sample
java
azure-event-hubs
Azure Spring Cloud Stream Binder Sample project for Event Hub client library
azure-spring-cloud-sample-eventhubs-binder

Azure Spring Cloud Stream Binder for Event Hub Code Sample shared library for Java

Key concepts

This code sample demonstrates how to use the Spring Cloud Stream Binder for Azure Event Hub.The sample app has two operating modes. One way is to expose a Restful API to receive string message, another way is to automatically provide string messages. These messages are published to an event hub. The sample will also consume messages from the same event hub.

Getting started

Running this sample will be charged by Azure. You can check the usage and bill at this link.

Create Azure resources

We have several ways to config the Spring Cloud Stream Binder for Azure Event Hub. You can choose anyone of them.

Important

When using the Restful API to send messages, the Active profiles must contain manual.

Method 1: Connection string based usage

  1. Create Azure Event Hubs. Please note Basic tier is unsupported. After creating the Azure Event Hub, you can create your own Consumer Group or use the default "$Default" Consumer Group.

  2. Create Azure Storage for checkpoint use.

  3. Update application.yaml.

    spring:
      cloud:
        azure:
          eventhub:
            # Fill event hub namespace connection string copied from portal
            connection-string: [eventhub-namespace-connection-string] 
            # Fill checkpoint storage account name, access key and container 
            checkpoint-storage-account: [checkpoint-storage-account]
            checkpoint-access-key: [checkpoint-access-key]
            checkpoint-container: [checkpoint-container]
        stream:
          function:
            definition: consume;supply
          bindings:
            consume-in-0:
              destination: [eventhub-name]
              group: [consumer-group]
            supply-out-0:
              destination: [the-same-eventhub-name-as-above]

Method 2: Service principal based usage

  1. Create a service principal for use in by your app. Please follow create service principal from Azure CLI.

  2. Create Azure Event Hubs. Please note Basic tier is unsupported. After creating the Azure Event Hub, you can create your own Consumer Group or use the default "$Default" Consumer Group.

  3. Create Azure Storage for checkpoint use.

  4. Add Role Assignment for Event Hub, Storage Account and Resource group. See Service principal for Azure resources with Event Hubs to add role assignment for Event Hub, Storage Account, Resource group is similar.

    • Resource group: assign Contributor role for service principal.
    • Event Hub: assign Contributor role for service principal.
    • Storage Account: assign Storage Account Key Operator Service Role role for service principal.
  5. Update application-sp.yaml.

    spring:
      cloud:
        azure:
          client-id: [service-principal-id]
          client-secret: [service-principal-secret]
          tenant-id: [tenant-id]
          resource-group: [resource-group]
          eventhub:
            namespace: [eventhub-namespace]
            checkpoint-storage-account: [checkpoint-storage-account]
            checkpoint-container: [checkpoint-container]
        stream:
          function:
            definition: consume;supply
          bindings:
            consume-in-0:
              destination: [eventhub-name]
              group: [consumer-group]
            supply-out-0:
              destination: [the-same-eventhub-name-as-above]

    We should specify spring.profiles.active=sp to run the Spring Boot application.

Method 3: MSI credential based usage

Set up managed identity

Please follow create managed identity to set up managed identity.

Create other Azure resources
  1. Create Azure Event Hubs. Please note Basic tier is unsupported. After creating the Azure Event Hub, you can create your own Consumer Group or use the default "$Default" Consumer Group.

  2. Create Azure Storage for checkpoint use.

  3. Add Role Assignment for Event Hub and Storage Account. See Managed identities for Azure resources with Event Hubs to add role assignment for Event Hub, Storage Account is similar.

    • Event Hub: assign Contributor role for managed identity.
    • Storage Account: assign Storage Account Key Operator Service Role role for managed identity.
Update MSI related properties
  1. Update application-mi.yaml
    spring:
      cloud:
        azure:
          msi-enabled: true
          client-id: [the-id-of-managed-identity]
          resource-group: [resource-group]
          # Fill subscription ID copied from portal
          subscription-id: [subscription-id]
          eventhub:
            namespace: [eventhub-namespace]
            checkpoint-storage-account: [checkpoint-storage-account]
            checkpoint-container: [checkpoint-container]
        stream:
          function:
            definition: consume;supply
          bindings:
            consume-in-0:
              destination: [eventhub-name]
              group: [consumer-group]
            supply-out-0:
              destination: [the-same-eventhub-name-as-above]

    We should specify spring.profiles.active=mi to run the Spring Boot application. For App Service, please add a configuration entry for this.

Redeploy Application

If you update the spring.cloud.azure.managed-identity.client-id property after deploying the app, or update the role assignment for services, please try to redeploy the app again.

You can follow Deploy a Spring Boot JAR file to Azure App Service to deploy this application to App Service

Enable auto create

If you want to auto create the Azure Event Hub and Azure Storage account instances, make sure you add such properties (only support the service principal and managed identity cases):

spring:
  cloud:
    azure:
      subscription-id: [subscription-id]
      auto-create-resources: true
      environment: Azure
      region: [region]

Enable sync message

To enable message sending in a synchronized way with Spring Cloud Stream 3.x, azure-spring-cloud-stream-binder-eventhubs supports the sync producer mode to get responses for sent messages. By enabling following configuration, you could use StreamBridge for the synchronized message producing.

spring:
  cloud:
    stream:
      eventhub:
        bindings:
          supply-out-0:
            producer:
              sync: true

Using Batch Consuming

To enable batch consuming feature, you should add below configuration in the batch profile.

spring:
  cloud:
    stream:
      bindings:
        consume-in-0:
          destination: [eventhub-name]
          group: [consumer-group]
          consumer:
            batch-mode: true 
      eventhub:
        bindings:
          consume-in-0:
            consumer:
              checkpoint-mode: BATCH # or MANUAL as needed
              max-batch-size: [max-batch-size] # The default valueis 10
              max-wait-time: [max-wait-time] # Optional, the default value is null

For checkpointing mode as BATCH, you can use below code to send messages and consume in batches, see the BatchConsumerConfiguration.java

    @Bean
    public Consumer<List<String>> consume() {
        return list -> list.forEach(event -> LOGGER.info("New event received: '{}'",event));
    }

    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"Hello world"+ i++ +"\"").build();
        };
    }

For checkpointing mode as MANUAL, you can use below code to send messages and consume/checkpoint in batches.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.ENQUEUED_TIME)).get(i));
            }
        
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .subscribe();
        };
    }

    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"Hello world"+ i++ +"\"").build();
        };
    }

Examples

  1. Run the mvn spring-boot:run in the root of the code sample to get the app running.

  2. Send a POST request

    $ ### Send messages through imperative.  
    $ curl -X POST http://localhost:8080/messages/imperative/staticalDestination?message=hello
    $ curl -X POST http://localhost:8080/messages/imperative/dynamicDestination?message=hello
    
    $ ### Send messages through reactive.
    $ curl -X POST http://localhost:8080/messages/reactive?message=hello
    

    or when the app runs on App Service or VM

    $ ### Send messages through imperative.
    $ curl -d -X POST https://[your-app-URL]/messages/imperative/staticalDestination?message=hello
    $ curl -d -X POST https://[your-app-URL]/messages/imperative/dynamicDestination?message=hello
    
    $ ### Send messages through reactive.
    $ curl -d -X POST https://[your-app-URL]/messages/reactive?message=hello
    
  3. Verify in your app’s logs that a similar message was posted:

    New message received: 'hello', partition key: 2002572479, sequence number: 4, offset: 768, enqueued time: 2021-06-03T01:47:36.859Z
    Message 'hello' successfully checkpointed
    
  4. Delete the resources on Azure Portal to avoid unexpected charges.

Troubleshooting

Next steps

Contributing