Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve processor stability #473

Merged
merged 1 commit into from
Feb 26, 2025
Merged

improve processor stability #473

merged 1 commit into from
Feb 26, 2025

Conversation

frairon
Copy link
Contributor

@frairon frairon commented Feb 25, 2025

What this PR tries to improve

Stability of processors in the face of restarting/unstable kafka processors.

Background

We're facing the issue that our kafka-cluster restarts or rebalances from time to time, which makes all processors restart. Since the processors will rebalance, this PR uses reconnecting views to be used for the join/lookup tables.

@@ -257,7 +257,7 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
join := join
pp.runnerGroup.Go(func() error {
defer pp.state.SetState(PPStateStopping)
return join.CatchupForever(runnerCtx, false)
return join.CatchupForever(runnerCtx, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes the join-table trying to reconnect while the processor is running (together with the other table some lines below)

if pp.cancelRunnerGroup != nil {
pp.cancelRunnerGroup()
}

// wait for the runner to be done
runningErrs := multierror.Append(pp.runnerGroup.Wait().ErrorOrNil())

close(pp.input)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channels are now closed after the runner-group is done --> visitors are attaching to the runner-group for this.

@@ -637,15 +637,6 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta

var wg sync.WaitGroup

// drains the channel and drops out when closed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was actually no point to distinguish between draining until close or draining until it's empty, because this function is writing to the channel.
In case two visitors are started at the same time and one of them panics or is stopped, it'll drain the other's messages too - but that is an issue that existed before, so we'll ignore it here :)

@@ -421,11 +421,11 @@ func (g *Processor) handleSessionErrors(ctx, sessionCtx context.Context, session
)

if errors.As(err, &errProc) {
g.log.Debugf("error processing message (non-transient), shutting down processor: %v", err)
g.log.Printf("error processing message (non-transient), shutting down processor: %v", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have those important errors not as debug.

@@ -89,7 +89,7 @@ func checkBroker(broker Broker, config *sarama.Config) error {
}

err := broker.Open(config)
if err != nil {
if err != nil && !errors.Is(err, sarama.ErrAlreadyConnected) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accordin to docs, Open might return this if it's already connected and it's not an error.

@@ -18,6 +18,7 @@ func TestProcessorShutdown_KafkaDisconnect(t *testing.T) {
brokers := initSystemTest(t)
var (
topic = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d", time.Now().Unix()))
join = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d-join", time.Now().Unix()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding some join tables to the tests so we can test the reconnecting joins change from above.

@frairon frairon force-pushed the processor-stability branch from 2de9c0e to 17448ff Compare February 25, 2025 08:00
@frairon frairon merged commit 3a26dac into master Feb 26, 2025
5 checks passed
@frairon frairon deleted the processor-stability branch February 26, 2025 05:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants