-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
improve processor stability #473
Conversation
@@ -257,7 +257,7 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error { | |||
join := join | |||
pp.runnerGroup.Go(func() error { | |||
defer pp.state.SetState(PPStateStopping) | |||
return join.CatchupForever(runnerCtx, false) | |||
return join.CatchupForever(runnerCtx, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this makes the join-table trying to reconnect while the processor is running (together with the other table some lines below)
if pp.cancelRunnerGroup != nil { | ||
pp.cancelRunnerGroup() | ||
} | ||
|
||
// wait for the runner to be done | ||
runningErrs := multierror.Append(pp.runnerGroup.Wait().ErrorOrNil()) | ||
|
||
close(pp.input) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
channels are now closed after the runner-group is done --> visitors are attaching to the runner-group for this.
@@ -637,15 +637,6 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta | |||
|
|||
var wg sync.WaitGroup | |||
|
|||
// drains the channel and drops out when closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there was actually no point to distinguish between draining until close or draining until it's empty, because this function is writing to the channel.
In case two visitors are started at the same time and one of them panics or is stopped, it'll drain the other's messages too - but that is an issue that existed before, so we'll ignore it here :)
@@ -421,11 +421,11 @@ func (g *Processor) handleSessionErrors(ctx, sessionCtx context.Context, session | |||
) | |||
|
|||
if errors.As(err, &errProc) { | |||
g.log.Debugf("error processing message (non-transient), shutting down processor: %v", err) | |||
g.log.Printf("error processing message (non-transient), shutting down processor: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's have those important errors not as debug.
@@ -89,7 +89,7 @@ func checkBroker(broker Broker, config *sarama.Config) error { | |||
} | |||
|
|||
err := broker.Open(config) | |||
if err != nil { | |||
if err != nil && !errors.Is(err, sarama.ErrAlreadyConnected) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accordin to docs, Open
might return this if it's already connected and it's not an error.
@@ -18,6 +18,7 @@ func TestProcessorShutdown_KafkaDisconnect(t *testing.T) { | |||
brokers := initSystemTest(t) | |||
var ( | |||
topic = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d", time.Now().Unix())) | |||
join = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d-join", time.Now().Unix())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding some join tables to the tests so we can test the reconnecting joins change from above.
2de9c0e
to
17448ff
Compare
What this PR tries to improve
Stability of processors in the face of restarting/unstable kafka processors.
Background
We're facing the issue that our kafka-cluster restarts or rebalances from time to time, which makes all processors restart. Since the processors will rebalance, this PR uses reconnecting views to be used for the join/lookup tables.