Skip to content

Commit

Permalink
Deregister standbys after x mins of inactivity
Browse files Browse the repository at this point in the history
  • Loading branch information
DAlperin committed Dec 21, 2022
1 parent c8c7066 commit a08cf4c
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 0 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ WORKDIR /go/src/github.com/fly-examples/fly-postgres
COPY . .

RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/event_handler ./cmd/event_handler
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/standby_cleaner ./cmd/standby_cleaner
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/start ./cmd/start
COPY ./bin/* /fly/bin/

Expand Down
63 changes: 63 additions & 0 deletions cmd/standby_cleaner/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"context"
"fmt"
"github.com/fly-apps/postgres-flex/pkg/flypg"
"os"
"time"
)

var Minute int64 = 60

func main() {
ctx := context.Background()
flypgNode, err := flypg.NewNode()
if err != nil {
fmt.Printf("failed to reference node: %s\n", err)
os.Exit(1)
}

conn, err := flypgNode.RepMgr.NewLocalConnection(ctx)
if err != nil {
fmt.Printf("failed to open local connection: %s\n", err)
os.Exit(1)
}

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

seenAt := map[int]int64{}

for _ = range ticker.C {
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
if err != nil {
fmt.Printf("Failed to check role: %s", err)
continue
}
if role != "primary" {
continue
}
standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
if err != nil {
fmt.Printf("Failed to get standbys: %s", err)
continue
}
for _, standby := range standbys {
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
if err != nil {
if time.Now().Unix()-seenAt[standby.Id] >= 10*Minute {
err := flypgNode.RepMgr.UnregisterStandby(standby.Id)
if err != nil {
fmt.Printf("Failed to unregister %d: %s", standby.Id, err)
continue
}
delete(seenAt, standby.Id)
}
} else {
seenAt[standby.Id] = time.Now().Unix()
newConn.Close(ctx)
}
}
}
}
1 change: 1 addition & 0 deletions cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func main() {
svisor.AddProcess("repmgrd", fmt.Sprintf("gosu postgres repmgrd -f %s --daemonize=false", node.RepMgr.ConfigPath),
supervisor.WithRestart(0, 5*time.Second),
)
svisor.AddProcess("standby_cleaner", "/usr/local/bin/standby_cleaner", supervisor.WithRestart(0, 5*time.Second))

exporterEnv := map[string]string{
"DATA_SOURCE_URI": fmt.Sprintf("[%s]:%d/postgres?sslmode=disable", node.PrivateIP, node.Port),
Expand Down
36 changes: 36 additions & 0 deletions pkg/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (r *RepMgr) CurrentRole(ctx context.Context, pg *pgx.Conn) (string, error)
return r.memberRole(ctx, pg, int(r.ID))
}

func (r *RepMgr) Standbys(ctx context.Context, pg *pgx.Conn) ([]Standby, error) {
return r.standbyStatuses(ctx, pg, int(r.ID))
}

func (r *RepMgr) writeManagerConf() error {
file, err := os.OpenFile(r.ConfigPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
Expand Down Expand Up @@ -145,6 +149,15 @@ func (r *RepMgr) registerStandby() error {
return nil
}

func (r *RepMgr) UnregisterStandby(id int) error {
cmdStr := fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", r.ConfigPath, id)
if err := runCommand(cmdStr); err != nil {
fmt.Printf("failed to unregister standby: %s", err)
}

return nil
}

func (r *RepMgr) clonePrimary(ipStr string) error {
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
if err := runCommand(cmdStr); err != nil {
Expand Down Expand Up @@ -184,6 +197,29 @@ func (r *RepMgr) writePasswdConf() error {
return nil
}

type Standby struct {
Id int
Ip string
}

func (r *RepMgr) standbyStatuses(ctx context.Context, pg *pgx.Conn, id int) ([]Standby, error) {
sql := fmt.Sprintf("select node_id, node_name from repmgr.show_nodes where type = 'standby' and upstream_node_id = '%d';", id)
var standbys []Standby
rows, err := pg.Query(ctx, sql)
if err != nil {
return nil, err
}
for rows.Next() {
var s Standby
err := rows.Scan(&s.Id, &s.Ip)
if err != nil {
return nil, err
}
standbys = append(standbys, s)
}
return standbys, nil
}

func (r *RepMgr) memberRole(ctx context.Context, pg *pgx.Conn, id int) (string, error) {
sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id WHERE n.node_id = '%d';", id)
var role string
Expand Down

0 comments on commit a08cf4c

Please sign in to comment.