You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Version Information
Akka 1.5.14
Akka.Streams.Kafka 1.5.13.1
We ran into a strange error/behaviour using KafkaConsumer.CommittableSource.
The stream works as expected for quite a time without any problems. Then then we encountered some out of sync problems between some of our applications and we found in our logfiles that we didn't receive any messages from the KafkaConsumer.
Further research in our logs lead us to the following log entries which we are not able to interpret because they are not coming form our application.
In our interpretation the second is a result of the first one but do not know where the first came from.
There is also a similar error log entry regarding phobos in that timespan. [14:32:38 ERR ] Error while executing scheduled task ScheduledWork(Deadline=6118411510800, RepeatEvery=0, Cancelled=False, System.InvalidOperationException: Nullable object must have a value. at Phobos.Actor.Instrumentation.PhobosRepointableActorRef.TellInternal(Object message, IActorRef sender) at Akka.Actor.HashedWheelTimerScheduler.Bucket.Execute(Int64 deadline)
The following is the code snippet where we use KafkaConsumer.CommittableSource.
var committerDefaults = CommitterSettings.Create(Context.System); _control = KafkaConsumer.CommittableSource(consumerSettings, subscription) .SelectAsync(1, async elem => { try { var res = await _kafkaConsumeRouter.Ask<IInternEventResult>(elem, TimeSpan.FromSeconds(10)); if (res is InternEventFailure failure) { if (failure.Exception != null) elem.Record.Message.Headers.Add("exception", Encoding.UTF8.GetBytes(failure.Exception.ToString())); if (failure.Error != null) elem.Record.Message.Headers.Add("error", Encoding.UTF8.GetBytes(failure.Error)); _kafkaDltActor.Tell(elem); } } catch (AskTimeoutException) { elem.Record.Message.Headers.Add("error", Encoding.UTF8.GetBytes("timeout")); _kafkaDltActor.Tell(elem); } return (ICommittable) elem.CommitableOffset; }) .ToMaterialized(Committer.Sink(committerDefaults), DrainingControl<NotUsed>.Create) .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider)) .Run(Context.System); _control.IsShutdown.ContinueWith(_ => new KafkaStreamStopped()).PipeTo(Self);
The KafkaConsumer.CommittableSource stream don't break or stop.
We have encountered this problem only once and weren't able to reproduce it.
The text was updated successfully, but these errors were encountered:
Version Information
Akka 1.5.14
Akka.Streams.Kafka 1.5.13.1
We ran into a strange error/behaviour using KafkaConsumer.CommittableSource.
The stream works as expected for quite a time without any problems. Then then we encountered some out of sync problems between some of our applications and we found in our logfiles that we didn't receive any messages from the KafkaConsumer.
Further research in our logs lead us to the following log entries which we are not able to interpret because they are not coming form our application.
In our interpretation the second is a result of the first one but do not know where the first came from.
There is also a similar error log entry regarding phobos in that timespan.
[14:32:38 ERR ] Error while executing scheduled task ScheduledWork(Deadline=6118411510800, RepeatEvery=0, Cancelled=False, System.InvalidOperationException: Nullable object must have a value. at Phobos.Actor.Instrumentation.PhobosRepointableActorRef.TellInternal(Object message, IActorRef sender) at Akka.Actor.HashedWheelTimerScheduler.Bucket.Execute(Int64 deadline)
The following is the code snippet where we use KafkaConsumer.CommittableSource.
var committerDefaults = CommitterSettings.Create(Context.System); _control = KafkaConsumer.CommittableSource(consumerSettings, subscription) .SelectAsync(1, async elem => { try { var res = await _kafkaConsumeRouter.Ask<IInternEventResult>(elem, TimeSpan.FromSeconds(10)); if (res is InternEventFailure failure) { if (failure.Exception != null) elem.Record.Message.Headers.Add("exception", Encoding.UTF8.GetBytes(failure.Exception.ToString())); if (failure.Error != null) elem.Record.Message.Headers.Add("error", Encoding.UTF8.GetBytes(failure.Error)); _kafkaDltActor.Tell(elem); } } catch (AskTimeoutException) { elem.Record.Message.Headers.Add("error", Encoding.UTF8.GetBytes("timeout")); _kafkaDltActor.Tell(elem); } return (ICommittable) elem.CommitableOffset; }) .ToMaterialized(Committer.Sink(committerDefaults), DrainingControl<NotUsed>.Create) .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider)) .Run(Context.System); _control.IsShutdown.ContinueWith(_ => new KafkaStreamStopped()).PipeTo(Self);
The KafkaConsumer.CommittableSource stream don't break or stop.
We have encountered this problem only once and weren't able to reproduce it.
The text was updated successfully, but these errors were encountered: