diff --git a/cmd/standby_cleaner/main.go b/cmd/standby_cleaner/main.go index 069cdb02..075cab0e 100644 --- a/cmd/standby_cleaner/main.go +++ b/cmd/standby_cleaner/main.go @@ -25,15 +25,6 @@ func main() { os.Exit(1) } - // TODO - We should connect using the flypgadmin user so we can differentiate between - // internal admin connection usage and the actual repmgr process. - conn, err := flypgNode.RepMgr.NewLocalConnection(ctx) - if err != nil { - fmt.Printf("failed to open local connection: %s\n", err) - os.Exit(1) - } - defer conn.Close(ctx) - internal, err := flypg.ReadFromFile("/data/flypg.internal.conf") if err != nil { fmt.Printf("failed to open config: %s\n", err) @@ -64,45 +55,61 @@ func main() { for { select { case <-ticker.C: - role, err := flypgNode.RepMgr.CurrentRole(ctx, conn) - if err != nil { - fmt.Printf("Failed to check role: %s\n", err) - continue + if err := handleTick(ctx, flypgNode, seenAt, deadMemberRemovalThreshold); err != nil { + fmt.Println(err) } + } + } +} - if role != flypg.PrimaryRoleName { - continue - } +func handleTick(ctx context.Context, node *flypg.Node, seenAt map[int]time.Time, deadMemberRemovalThreshold time.Duration) error { + // TODO - We should connect using the flypgadmin user so we can differentiate between + // internal admin connection usage and the actual repmgr process. + conn, err := node.RepMgr.NewLocalConnection(ctx) + if err != nil { + fmt.Printf("failed to open local connection: %s\n", err) + os.Exit(1) + } + defer conn.Close(ctx) - standbys, err := flypgNode.RepMgr.Standbys(ctx, conn) - if err != nil { - fmt.Printf("Failed to query standbys: %s\n", err) - continue - } + role, err := node.RepMgr.CurrentRole(ctx, conn) + if err != nil { + return fmt.Errorf("failed to check role: %s", err) + } - for _, standby := range standbys { - newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip) - defer newConn.Close(ctx) - if err != nil { - // TODO - Verify the exception that's getting thrown. - if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold { - if err := flypgNode.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil { - fmt.Printf("failed to unregister member %d: %v\n", standby.Id, err.Error()) - continue - } + if role != flypg.PrimaryRoleName { + return nil + } - delete(seenAt, standby.Id) - } + standbys, err := node.RepMgr.Standbys(ctx, conn) + if err != nil { + return fmt.Errorf("failed to query standbys: %s", err) + } + for _, standby := range standbys { + // Wrap this in a function so connections are properly closed. + sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Ip) + if err != nil { + // TODO - Verify the exception that's getting thrown. + if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold { + if err := node.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil { + fmt.Printf("failed to unregister member %d: %v", standby.Id, err) continue } - seenAt[standby.Id] = time.Now() + delete(seenAt, standby.Id) } - removeOrphanedReplicationSlots(ctx, conn, standbys) + continue } + defer sConn.Close(ctx) + + seenAt[standby.Id] = time.Now() } + + removeOrphanedReplicationSlots(ctx, conn, standbys) + + return nil } func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Standby) {