Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking Token Updated if Creating a Publisher Fails #454

Closed
n3ziniuka5 opened this issue Oct 13, 2023 · 9 comments · Fixed by #455
Closed

Tracking Token Updated if Creating a Publisher Fails #454

n3ziniuka5 opened this issue Oct 13, 2023 · 9 comments · Fixed by #455

Comments

@n3ziniuka5
Copy link

Basic information

I am setting up Axon to publish events to Kafka. I've tried various confirmation and event processor modes, and in all combinations I've managed to reproduce an issue when the tracking token is updated when Kafka is down, losing some events when Kafka is back up.

I am not fully up to speed with Axon internals, but one thing that jumps out in KafkaPublisher.send is that only Kafka commit happens under uow.onPrepareCommit. Other important steps, such as creation of the producer and sending of the message, both of which can throw exceptions, happen outside of it.

  • Axon Framework version: 4.8.2
  • JDK version: 17
  • Kafka Extension version: 4.8.0

Steps to reproduce

I used the following spring boot configuration to publish events to Kafka:

axon:
  kafka:
    bootstrap-servers: localhost:9092
    client-id: axon-kafka-test
    default-topic: local.event
    properties:
      security.protocol: PLAINTEXT

    publisher:
      confirmation-mode: transactional

    producer:
      transaction-id-prefix: axon-kafka-test
      event-processor-mode: pooled-streaming

Steps to reproduce:

  • Start Axon application
  • Invoke a command that produces some events
  • Confirm received events in Kafka, note the token sequence number in token store
  • Shut down Kafka
  • Invoke a command that produces some events
  • Note how sequence number in token store has increased
  • Stop Axon application
  • Start Kafka
  • Start Axon application
  • Confirm missing event(s) in Kafka

Expected behaviour

Kafka event processor's tracking token shouldn't be updated, and all events should eventually be published to Kafka when it recovers.

Actual behaviour

Kafka event processor's tracking token is updated even while Kafka is down.

@gklijs
Copy link

gklijs commented Oct 13, 2023

Hi Laurynas, thanks for reporting. I might have a look today. I do have one question, which token store did you use?

@n3ziniuka5
Copy link
Author

Hello Gerard, I appreciate you looking into this. I used MongoDB token store.

@n3ziniuka5
Copy link
Author

n3ziniuka5 commented Oct 13, 2023

Here's how the tracking store is configured:

import com.mongodb.ReadConcern
import com.mongodb.ReadPreference
import com.mongodb.TransactionOptions
import com.mongodb.WriteConcern
import com.mongodb.client.MongoClient
import org.axonframework.common.transaction.TransactionManager
import org.axonframework.eventhandling.tokenstore.TokenStore
import org.axonframework.extensions.mongo.DefaultMongoTemplate
import org.axonframework.extensions.mongo.MongoTemplate
import org.axonframework.extensions.mongo.eventsourcing.tokenstore.MongoTokenStore
import org.axonframework.serialization.Serializer
import org.axonframework.spring.messaging.unitofwork.SpringTransactionManager
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.autoconfigure.mongo.MongoProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.mongodb.MongoDatabaseFactory
import org.springframework.data.mongodb.MongoTransactionManager

@Configuration
class AxonConfiguration {

  @Bean
  fun axonTemplate(client: MongoClient, mongoProperties: MongoProperties): MongoTemplate =
    DefaultMongoTemplate.builder()
      .mongoDatabase(client, mongoProperties.database)
      .build()

  @Bean
  fun axonTokenStore(axonTemplate: MongoTemplate,
                     serializer: Serializer,
                     axonTransactionManager: TransactionManager): TokenStore =
    MongoTokenStore.builder()
      .mongoTemplate(axonTemplate)
      .serializer(serializer)
      .transactionManager(axonTransactionManager)
      .build()

  @Bean
  fun axonTransactionManager(mongoDatabaseFactory: MongoDatabaseFactory): TransactionManager =
    SpringTransactionManager(MongoTransactionManager(mongoDatabaseFactory,
                                                     TransactionOptions.builder()
                                                       .readPreference(ReadPreference.primary())
                                                       .readConcern(ReadConcern.SNAPSHOT)
                                                       .writeConcern(WriteConcern.MAJORITY)
                                                       .build()))
}

@gklijs
Copy link

gklijs commented Oct 13, 2023

I was able to reproduce the issue, I'll dive into it further.

@n3ziniuka5
Copy link
Author

@gklijs unfortunately I am still experiencing an issue where publishing fails but token is updated. This time, I had an authentication issue:

2023-11-02T13:11:01.063+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-11-02T13:11:01.421+02:00  INFO 32758 --- [ad | axon-users] org.apache.kafka.clients.Metadata        : [Producer clientId=axon-users, transactionalId=axon-users0] Cluster ID: staging
2023-11-02T13:11:01.428+02:00  INFO 32758 --- [ad | axon-users] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Discovered transaction coordinator localhost:9093 (id: 500198333 rack: null)
2023-11-02T13:11:01.876+02:00  INFO 32758 --- [ad | axon-users] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] ProducerId set to 1197 with epoch 0
2023-11-02T13:11:01.912+02:00  WARN 32758 --- [ad | axon-users] org.apache.kafka.clients.NetworkClient   : [Producer clientId=axon-users, transactionalId=axon-users0] Error while fetching metadata with correlation id 7 : {laurynas-test=TOPIC_AUTHORIZATION_FAILED}
2023-11-02T13:11:01.913+02:00 ERROR 32758 --- [ad | axon-users] org.apache.kafka.clients.Metadata        : [Producer clientId=axon-users, transactionalId=axon-users0] Topic authorization failed for topics [laurynas-test]
2023-11-02T13:11:01.913+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Transiting to abortable error state due to org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [laurynas-test]
2023-11-02T13:11:01.914+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=axon-users, transactionalId=axon-users0] Aborting incomplete transaction
2023-11-02T13:11:01.915+02:00  WARN 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Error occurred. Starting retry mode.

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010) ~[kafka-clients-3.3.2.jar:na]
        at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:259) ~[kafka-clients-3.3.2.jar:na]
        at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1116) ~[kafka-clients-3.3.2.jar:na]
        at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:258) ~[kafka-clients-3.3.2.jar:na]
        at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:792) ~[kafka-clients-3.3.2.jar:na]
        at org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory$ProducerDecorator.commitTransaction(DefaultProducerFactory.java:259) ~[axon-kafka-4.9.0.jar:4.9.0]
        at org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher.tryCommit(KafkaPublisher.java:180) ~[axon-kafka-4.9.0.jar:4.9.0]
        at org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher.send(KafkaPublisher.java:160) ~[axon-kafka-4.9.0.jar:4.9.0]
        at org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher.handle(KafkaEventPublisher.java:80) ~[axon-kafka-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.SimpleEventHandlerInvoker.invokeHandlers(SimpleEventHandlerInvoker.java:128) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.SimpleEventHandlerInvoker.handle(SimpleEventHandlerInvoker.java:114) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.MultiEventHandlerInvoker.handle(MultiEventHandlerInvoker.java:91) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.processMessageInUnitOfWork(AbstractEventProcessor.java:195) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$1(AbstractEventProcessor.java:173) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:67) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor.lambda$new$1(TrackingEventProcessor.java:181) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$2(AbstractEventProcessor.java:174) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$3(AbstractEventProcessor.java:170) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.lambda$processInUnitOfWork$4(AbstractEventProcessor.java:166) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:165) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:491) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:316) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:1200) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.cleanUp(TrackingEventProcessor.java:1402) ~[axon-messaging-4.9.0.jar:4.9.0]
        at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1379) ~[axon-messaging-4.9.0.jar:4.9.0]
        at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [laurynas-test]

2023-11-02T13:11:01.917+02:00  WARN 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 1s
2023-11-02T13:11:01.919+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Released claim
2023-11-02T13:11:02.924+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Fetched token: IndexTrackingToken{globalIndex=0} for segment: Segment[0/0]
2023-11-02T13:11:02.966+02:00  WARN 32758 --- [ad | axon-users] org.apache.kafka.clients.NetworkClient   : [Producer clientId=axon-users, transactionalId=axon-users0] Error while fetching metadata with correlation id 8 : {laurynas-test=TOPIC_AUTHORIZATION_FAILED}
2023-11-02T13:11:02.966+02:00 ERROR 32758 --- [ad | axon-users] org.apache.kafka.clients.Metadata        : [Producer clientId=axon-users, transactionalId=axon-users0] Topic authorization failed for topics [laurynas-test]
2023-11-02T13:11:02.967+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Transiting to abortable error state due to org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [laurynas-test]
2023-11-02T13:11:02.967+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=axon-users, transactionalId=axon-users0] Aborting incomplete transaction
2023-11-02T13:11:02.967+02:00  WARN 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 2s
2023-11-02T13:11:02.969+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Released claim
2023-11-02T13:11:04.971+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Fetched token: IndexTrackingToken{globalIndex=1} for segment: Segment[0/0]
2023-11-02T13:11:05.012+02:00  WARN 32758 --- [ad | axon-users] org.apache.kafka.clients.NetworkClient   : [Producer clientId=axon-users, transactionalId=axon-users0] Error while fetching metadata with correlation id 9 : {laurynas-test=TOPIC_AUTHORIZATION_FAILED}
2023-11-02T13:11:05.013+02:00 ERROR 32758 --- [ad | axon-users] org.apache.kafka.clients.Metadata        : [Producer clientId=axon-users, transactionalId=axon-users0] Topic authorization failed for topics [laurynas-test]
2023-11-02T13:11:05.013+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Transiting to abortable error state due to org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [laurynas-test]
2023-11-02T13:11:05.013+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=axon-users, transactionalId=axon-users0] Aborting incomplete transaction
2023-11-02T13:11:05.013+02:00  WARN 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 4s
2023-11-02T13:11:05.015+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Released claim
2023-11-02T13:11:09.017+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Fetched token: IndexTrackingToken{globalIndex=2} for segment: Segment[0/0]
2023-11-02T13:11:09.056+02:00  WARN 32758 --- [ad | axon-users] org.apache.kafka.clients.NetworkClient   : [Producer clientId=axon-users, transactionalId=axon-users0] Error while fetching metadata with correlation id 10 : {laurynas-test=TOPIC_AUTHORIZATION_FAILED}
2023-11-02T13:11:09.057+02:00 ERROR 32758 --- [ad | axon-users] org.apache.kafka.clients.Metadata        : [Producer clientId=axon-users, transactionalId=axon-users0] Topic authorization failed for topics [laurynas-test]
2023-11-02T13:11:09.057+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=axon-users, transactionalId=axon-users0] Transiting to abortable error state due to org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [laurynas-test]
2023-11-02T13:11:09.057+02:00  INFO 32758 --- [ishing-group]-0] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=axon-users, transactionalId=axon-users0] Aborting incomplete transaction
2023-11-02T13:11:09.057+02:00  WARN 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 8s
2023-11-02T13:11:09.059+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Released claim
2023-11-02T13:11:17.061+02:00  INFO 32758 --- [ishing-group]-0] o.a.e.TrackingEventProcessor             : Fetched token: IndexTrackingToken{globalIndex=3} for segment: Segment[0/0]

As you can see, on each retry it kept fetching an incremented token - Fetched token: IndexTrackingToken{globalIndex=3}.

Once I fixed the authentication issue and restarted the app it didn't republish the failed events.

@gklijs
Copy link

gklijs commented Nov 7, 2023

Are you sure you are using 4.9.0 of the extension? To be sure before diving into this, as I don't understand how this would work with the new code. Although, from the line numbers, you do. Maybe the exception triggers after calling commitTransaction()? In this case, we should not support using Kafka transactions in this extension. What do you think?

Also, please create a new issue when there are new problems. Although related, this issue did solve some of the problems. So, instead of reopening this one, I rather have a new issue.

@n3ziniuka5
Copy link
Author

I am on 4.9.0. I've actually discovered a few additional issues:

  • when I added the extension, it didn't publish any events. I am guessing it's related to This Change. It only started publishing events after I manually changed the token type in the event store from ReplayToken to GlobalSequenceTrackingToken and set the token position to 0.
  • We had a serialization issue in one of the old events because we removed the data classes needed to serialize them:
org.axonframework.serialization.SerializationException: Error while deserializing object
   at org.axonframework.serialization.json.JacksonSerializer.deserialize(JacksonSerializer.java:207) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.axonserver.connector.event.axon.GrpcMetaDataAwareSerializer.deserialize(GrpcMetaDataAwareSerializer.java:70) ~[axon-server-connector-4.9.0.jar!/:4.9.0]
   at org.axonframework.serialization.LazyDeserializingObject.getObject(LazyDeserializingObject.java:102) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.serialization.SerializedMessage.serializePayload(SerializedMessage.java:123) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.messaging.MessageDecorator.serializePayload(MessageDecorator.java:66) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter.createKafkaMessage(DefaultKafkaMessageConverter.java:126) ~[axon-kafka-4.9.0.jar!/:4.9.0]
   at org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher.send(KafkaPublisher.java:149) ~[axon-kafka-4.9.0.jar!/:4.9.0]
   at org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher.handle(KafkaEventPublisher.java:80) ~[axon-kafka-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.SimpleEventHandlerInvoker.invokeHandlers(SimpleEventHandlerInvoker.java:128) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.SimpleEventHandlerInvoker.handle(SimpleEventHandlerInvoker.java:114) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.MultiEventHandlerInvoker.handle(MultiEventHandlerInvoker.java:91) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.AbstractEventProcessor.processMessageInUnitOfWork(AbstractEventProcessor.java:195) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$1(AbstractEventProcessor.java:173) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:67) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.TrackingEventProcessor.lambda$new$1(TrackingEventProcessor.java:181) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$2(AbstractEventProcessor.java:174) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$3(AbstractEventProcessor.java:170) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.AbstractEventProcessor.lambda$processInUnitOfWork$4(AbstractEventProcessor.java:166) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:165) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:491) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:316) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:1200) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.cleanUp(TrackingEventProcessor.java:1402) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1379) ~[axon-messaging-4.9.0.jar!/:4.9.0]
   at java.base/java.lang.Thread.run(Thread.java:840)

We identified and fixed the issue. However, it didn't prevent the token sequence from advancing, so the failing event wasn't reprocessed. We had to nuke the topic and reprocess events from the beginning.

Whether or not it makes sense to disable Kafka transactions I can't really say. I think conceptually it makes sense to have transactions because you should be able to rollback a transaction if anything else goes wrong to get exactly-once publishing. In our case, we wanted to guarantee exactly-once publishing because at the moment our consumers are not idempotent.

Sorry for not opening a separate issue, leaving a comment was just quicker. For the time being, we managed to work around the issues and are monitoring for errors in case we need to replay events manually.

@gklijs
Copy link

gklijs commented Nov 7, 2023

The problem is that it will never become exactly once because there are two different systems. If it's working now, its fine I guess.

@smcvb
Copy link
Member

smcvb commented Nov 9, 2023

Hi @n3ziniuka5, just chipping in for this remark you made:

when I added the extension, it didn't publish any events. I am guessing it's related to AxonFramework/AxonFramework#2778. It only started publishing events after I manually changed the token type in the event store from ReplayToken to GlobalSequenceTrackingToken and set the token position to 0.

This is indeed why handling didn't do anything...an unforeseen side effect of making that adjustment in Axon Framework.
Right now, the KafkaEventPublisher of this extension, the component in charge of given the AF events to a Kafka Producer, is not allowed to handle events during a replay.

Although a small enhancement, it may be practical to provide a property to switch that behavior.
Although, in general, the default to not publish old events to Kafka makes sense to me.

@gklijs, what are you thoughts on this pointer? It would of course merit a new issue, but it seemed fair to me to hold the discussion here, as @n3ziniuka5 already dropped it here :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants