Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamableKafkaMessageSource create token implementations #30

Closed
shubhojitr opened this issue Dec 23, 2019 · 3 comments · Fixed by #403
Closed

StreamableKafkaMessageSource create token implementations #30

shubhojitr opened this issue Dec 23, 2019 · 3 comments · Fixed by #403

Comments

@shubhojitr
Copy link

I'm using axon 3.4.1 with kafka as StreamableMessageSource

There are scenarios where I would like to stop, reset-token(or createTokenAt) and start a particular tracking processor. Stopping and starting is pretty much straight forward. But I'm facing difficulty to reset the token to a particular time stamp as KafkaMessageSource does not provide an implementation for createTokenAt(Instant dateTime). It will be good to have this feature implemented.
I've checked this in axonframework.extensions.kafka 4.0-RC2 and its not implemented there as well.

Below code snippet throws UnsupportedOperationException.

@Autowired
private EventProcessingConfiguration configuration;

@PostMapping("/createTokenAt")
public ResponseEntity<?> createTokenAt(@RequestParam(value = "processorName") String processorName, @RequestParam(value = "tokenTime") String tokenTime) {
    //date format 2007-12-03T10:15:30.00Z

    configuration.eventProcessorByProcessingGroup(processorName, TrackingEventProcessor.class)
            .ifPresent(trackingEventProcessor -> {
                trackingEventProcessor.resetTokens(streamableMessageSource -> streamableMessageSource.createTokenAt(Instant.parse(tokenTime)));
            });
    return new ResponseEntity<>(HttpStatus.OK);
@smcvb
Copy link
Member

smcvb commented Dec 23, 2019

Thanks for filing this issue @shubhojitr, I had it in the back of my mind somewhere any how but didn't find the time to draft up a description for it.

I am still thinking how this is gonna be implemented. The StreamableMessageSource works with timestamps, whilst as you might now Kafka deals with offsets. Guessing there's an API to retrieve the offset at a given timestamp but if there isn't...well, we'll need to dig deeper in that scenario.

Note that the upcoming release candidate for this extension introduces a SubscribableKafkaMessageSource. Using that instead of the current StreamableKafkaMessageSource lets you use Kafka's idea of resetting as the former message source will be provided to a SubscribingEventProcessor.

By the way, I would recommend to upgrade to a more recent version of Axon (4.2.1 at the moment). We do not have any active development on 3.x for over a year now and I do not envision this changing in the near future.

@smcvb smcvb changed the title KafkaMessageSource Implementation for createTokenAt(Instant dateTime) StreamableKafkaMessageSource create token implementations Dec 23, 2019
@shubhojitr
Copy link
Author

@smcvb Thanks for your reply. I'll definitely look into SubscribableKafkaMessageSource. Any timeline for the new kafka RC release?
And yes as recommended by you I'm in processes of upgrading my app to latest version of axon.

@smcvb
Copy link
Member

smcvb commented Dec 24, 2019

RC3 should be out there before the end of the year. :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants