Skip to content

Commit

Permalink
extract loggic in an explicit function
Browse files Browse the repository at this point in the history
  • Loading branch information
sakoush committed Sep 30, 2024
1 parent 6f740f9 commit 2ed606c
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions operator/scheduler/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,32 +147,8 @@ func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientC
<-ch
// absorb signals from the same disconnected go routines
time.Sleep(1 * time.Second)
// on new reconnects we send a list of servers to the schedule
err := retryFn(s.handleRegisteredServers, conn, namespace, s.logger.WithName("handleRegisteredServers"))
if err != nil {
s.logger.Error(err, "Failed to send registered server to scheduler")
}

if err == nil {
err = retryFn(s.handleExperiments, conn, namespace, s.logger.WithName("handleExperiments"))
if err != nil {
s.logger.Error(err, "Failed to send experiments to scheduler")
}
}

if err == nil {
err = retryFn(s.handlePipelines, conn, namespace, s.logger.WithName("handlePipelines"))
if err != nil {
s.logger.Error(err, "Failed to send pipelines to scheduler")
}
}

if err == nil {
err = retryFn(s.handleModels, conn, namespace, s.logger.WithName("handleModels"))
if err != nil {
s.logger.Error(err, "Failed to send models to scheduler")
}
}
s.handleReconnect(conn, namespace)

triggered.Store(false)
}
Expand All @@ -181,6 +157,35 @@ func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientC
ch <- struct{}{} // initial trigger
}

func (s *SchedulerClient) handleReconnect(conn *grpc.ClientConn, namespace string) {
// on new reconnects we send a list of servers to the schedule
err := retryFn(s.handleRegisteredServers, conn, namespace, s.logger.WithName("handleRegisteredServers"))
if err != nil {
s.logger.Error(err, "Failed to send registered server to scheduler")
}

if err == nil {
err = retryFn(s.handleExperiments, conn, namespace, s.logger.WithName("handleExperiments"))
if err != nil {
s.logger.Error(err, "Failed to send experiments to scheduler")
}
}

if err == nil {
err = retryFn(s.handlePipelines, conn, namespace, s.logger.WithName("handlePipelines"))
if err != nil {
s.logger.Error(err, "Failed to send pipelines to scheduler")
}
}

if err == nil {
err = retryFn(s.handleModels, conn, namespace, s.logger.WithName("handleModels"))
if err != nil {
s.logger.Error(err, "Failed to send models to scheduler")
}
}
}

func (s *SchedulerClient) RemoveConnection(namespace string) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down

0 comments on commit 2ed606c

Please sign in to comment.