diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java index cb8800e1a..7af06687b 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java @@ -146,7 +146,35 @@ public void setInitialOffsetProvider(Function initialOffsetProvi { this.initialOffsetProvider = initialOffsetProvider; } - + + /*** + * A prefab initial offset provider that starts from the first event available. + * + * How to use this initial offset provider: setInitialOffsetProvider(new EventProcessorOptions.StartOfStreamInitialOffsetProvider()); + */ + public class StartOfStreamInitialOffsetProvider implements Function + { + @Override + public Object apply(String t) + { + return PartitionReceiver.START_OF_STREAM; + } + } + + /*** + * A prefab initial offset provider that starts from the next event that becomes available. + * + * How to use this initial offset provider: setInitialOffsetProvider(new EventProcessorOptions.EndOfStreamInitialOffsetProvider()); + */ + public class EndOfStreamInitialOffsetProvider implements Function + { + @Override + public Object apply(String t) + { + return PartitionReceiver.END_OF_STREAM; + } + } + /*** * Returns whether the EventProcessorHost will call IEventProcessor.onEvents() with an empty iterable * when a receive timeout occurs (true) or not (false). diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index c36826ec4..8ec356ce3 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -59,6 +59,13 @@ public final class PartitionReceiver extends ClientEntity implements IReceiverSe */ public static final String START_OF_STREAM = "-1"; + /** + * This is a constant defined to represent the current end of a partition stream in EventHub. + * This can be used as an offset argument in receiver creation to start receiving from the latest + * event, instead of a specific offset or point in time. + */ + public static final String END_OF_STREAM = "@latest"; + private final String partitionId; private final MessagingFactory underlyingFactory; private final String eventHubName; diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java index 58d377682..3d5c244e1 100644 --- a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java @@ -6,6 +6,7 @@ import java.time.Instant; import java.util.Iterator; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import org.junit.After; @@ -69,6 +70,18 @@ public void testReceiverStartOfStreamFilters() throws ServiceBusException } } + @Test() + public void testReceiverLatestFilter() throws ServiceBusException, ExecutionException, InterruptedException + { + offsetReceiver = ehClient.createReceiverSync(cgName, partitionId, PartitionReceiver.END_OF_STREAM, false); + Iterable events = offsetReceiver.receiveSync(100); + Assert.assertTrue(events == null); + + TestBase.pushEventsToPartition(ehClient, partitionId, 10).get(); + events = offsetReceiver.receiveSync(100); + Assert.assertTrue(events != null && events.iterator().hasNext()); + } + @Test() public void testReceiverOffsetInclusiveFilter() throws ServiceBusException {