Skip to content

Commit

Permalink
Merge pull request #41 from fly-apps/fix-connection-leak-2
Browse files Browse the repository at this point in the history
Push ticker logic into a separate function to address connection leak
  • Loading branch information
davissp14 authored Jan 19, 2023
2 parents 2e7cbbb + d32325d commit 3d43c9b
Showing 1 changed file with 42 additions and 35 deletions.
77 changes: 42 additions & 35 deletions cmd/standby_cleaner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,6 @@ func main() {
os.Exit(1)
}

// TODO - We should connect using the flypgadmin user so we can differentiate between
// internal admin connection usage and the actual repmgr process.
conn, err := flypgNode.RepMgr.NewLocalConnection(ctx)
if err != nil {
fmt.Printf("failed to open local connection: %s\n", err)
os.Exit(1)
}
defer conn.Close(ctx)

internal, err := flypg.ReadFromFile("/data/flypg.internal.conf")
if err != nil {
fmt.Printf("failed to open config: %s\n", err)
Expand Down Expand Up @@ -64,45 +55,61 @@ func main() {
for {
select {
case <-ticker.C:
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
if err != nil {
fmt.Printf("Failed to check role: %s\n", err)
continue
if err := handleTick(ctx, flypgNode, seenAt, deadMemberRemovalThreshold); err != nil {
fmt.Println(err)
}
}
}
}

if role != flypg.PrimaryRoleName {
continue
}
func handleTick(ctx context.Context, node *flypg.Node, seenAt map[int]time.Time, deadMemberRemovalThreshold time.Duration) error {
// TODO - We should connect using the flypgadmin user so we can differentiate between
// internal admin connection usage and the actual repmgr process.
conn, err := node.RepMgr.NewLocalConnection(ctx)
if err != nil {
fmt.Printf("failed to open local connection: %s\n", err)
os.Exit(1)
}
defer conn.Close(ctx)

standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
if err != nil {
fmt.Printf("Failed to query standbys: %s\n", err)
continue
}
role, err := node.RepMgr.CurrentRole(ctx, conn)
if err != nil {
return fmt.Errorf("failed to check role: %s", err)
}

for _, standby := range standbys {
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
defer newConn.Close(ctx)
if err != nil {
// TODO - Verify the exception that's getting thrown.
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
if err := flypgNode.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
fmt.Printf("failed to unregister member %d: %v\n", standby.Id, err.Error())
continue
}
if role != flypg.PrimaryRoleName {
return nil
}

delete(seenAt, standby.Id)
}
standbys, err := node.RepMgr.Standbys(ctx, conn)
if err != nil {
return fmt.Errorf("failed to query standbys: %s", err)
}

for _, standby := range standbys {
// Wrap this in a function so connections are properly closed.
sConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Ip)
if err != nil {
// TODO - Verify the exception that's getting thrown.
if time.Now().Sub(seenAt[standby.Id]) >= deadMemberRemovalThreshold {
if err := node.UnregisterMemberByID(ctx, int32(standby.Id)); err != nil {
fmt.Printf("failed to unregister member %d: %v", standby.Id, err)
continue
}

seenAt[standby.Id] = time.Now()
delete(seenAt, standby.Id)
}

removeOrphanedReplicationSlots(ctx, conn, standbys)
continue
}
defer sConn.Close(ctx)

seenAt[standby.Id] = time.Now()
}

removeOrphanedReplicationSlots(ctx, conn, standbys)

return nil
}

func removeOrphanedReplicationSlots(ctx context.Context, conn *pgx.Conn, standbys []flypg.Standby) {
Expand Down

0 comments on commit 3d43c9b

Please sign in to comment.