Skip to content

Commit

Permalink
Support special @latest (end of stream) filter. (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesBirdsall authored Apr 21, 2017
1 parent 551490c commit 018cfbb
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,35 @@ public void setInitialOffsetProvider(Function<String, Object> initialOffsetProvi
{
this.initialOffsetProvider = initialOffsetProvider;
}


/***
* A prefab initial offset provider that starts from the first event available.
*
* How to use this initial offset provider: setInitialOffsetProvider(new EventProcessorOptions.StartOfStreamInitialOffsetProvider());
*/
public class StartOfStreamInitialOffsetProvider implements Function<String, Object>
{
@Override
public Object apply(String t)
{
return PartitionReceiver.START_OF_STREAM;
}
}

/***
* A prefab initial offset provider that starts from the next event that becomes available.
*
* How to use this initial offset provider: setInitialOffsetProvider(new EventProcessorOptions.EndOfStreamInitialOffsetProvider());
*/
public class EndOfStreamInitialOffsetProvider implements Function<String, Object>
{
@Override
public Object apply(String t)
{
return PartitionReceiver.END_OF_STREAM;
}
}

/***
* Returns whether the EventProcessorHost will call IEventProcessor.onEvents() with an empty iterable
* when a receive timeout occurs (true) or not (false).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ public final class PartitionReceiver extends ClientEntity implements IReceiverSe
*/
public static final String START_OF_STREAM = "-1";

/**
* This is a constant defined to represent the current end of a partition stream in EventHub.
* This can be used as an offset argument in receiver creation to start receiving from the latest
* event, instead of a specific offset or point in time.
*/
public static final String END_OF_STREAM = "@latest";

private final String partitionId;
private final MessagingFactory underlyingFactory;
private final String eventHubName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.time.Instant;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import org.junit.After;
Expand Down Expand Up @@ -69,6 +70,18 @@ public void testReceiverStartOfStreamFilters() throws ServiceBusException
}
}

@Test()
public void testReceiverLatestFilter() throws ServiceBusException, ExecutionException, InterruptedException
{
offsetReceiver = ehClient.createReceiverSync(cgName, partitionId, PartitionReceiver.END_OF_STREAM, false);
Iterable<EventData> events = offsetReceiver.receiveSync(100);
Assert.assertTrue(events == null);

TestBase.pushEventsToPartition(ehClient, partitionId, 10).get();
events = offsetReceiver.receiveSync(100);
Assert.assertTrue(events != null && events.iterator().hasNext());
}

@Test()
public void testReceiverOffsetInclusiveFilter() throws ServiceBusException
{
Expand Down

0 comments on commit 018cfbb

Please sign in to comment.