Skip to content

Commit

Permalink
Deregister standbys after x mins of inactivity
Browse files Browse the repository at this point in the history
  • Loading branch information
DAlperin committed Dec 21, 2022
1 parent c8c7066 commit 53ad4ae
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 10 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ WORKDIR /go/src/github.com/fly-examples/fly-postgres
COPY . .

RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/event_handler ./cmd/event_handler
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/standby_cleaner ./cmd/standby_cleaner
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /fly/bin/start ./cmd/start
COPY ./bin/* /fly/bin/

Expand Down
63 changes: 63 additions & 0 deletions cmd/standby_cleaner/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"context"
"fmt"
"github.com/fly-apps/postgres-flex/pkg/flypg"
"os"
"time"
)

var Minute int64 = 60

func main() {
ctx := context.Background()
flypgNode, err := flypg.NewNode()
if err != nil {
fmt.Printf("failed to reference node: %s\n", err)
os.Exit(1)
}

conn, err := flypgNode.RepMgr.NewLocalConnection(ctx)
if err != nil {
fmt.Printf("failed to open local connection: %s\n", err)
os.Exit(1)
}

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

seenAt := map[int]int64{}

for _ = range ticker.C {
role, err := flypgNode.RepMgr.CurrentRole(ctx, conn)
if err != nil {
fmt.Printf("Failed to check role: %s", err)
continue
}
if role != "primary" {
continue
}
standbys, err := flypgNode.RepMgr.Standbys(ctx, conn)
if err != nil {
fmt.Printf("Failed to get standbys: %s", err)
continue
}
for _, standby := range standbys {
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
if err != nil {
if time.Now().Unix()-seenAt[standby.Id] >= 10*Minute {
err := flypg.RunCommand(fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", flypgNode.RepMgr.ConfigPath, standby.Id))
if err != nil {
fmt.Printf("Failed to deregister %d: %s", standby.Id, err)
continue
}
delete(seenAt, standby.Id)
}
} else {
seenAt[standby.Id] = time.Now().Unix()
newConn.Close(ctx)
}
}
}
}
1 change: 1 addition & 0 deletions cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func main() {
svisor.AddProcess("repmgrd", fmt.Sprintf("gosu postgres repmgrd -f %s --daemonize=false", node.RepMgr.ConfigPath),
supervisor.WithRestart(0, 5*time.Second),
)
svisor.AddProcess("standby_cleaner", "/usr/local/bin/standby_cleaner", supervisor.WithRestart(0, 5*time.Second))

exporterEnv := map[string]string{
"DATA_SOURCE_URI": fmt.Sprintf("[%s]:%d/postgres?sslmode=disable", node.PrivateIP, node.Port),
Expand Down
2 changes: 1 addition & 1 deletion pkg/flypg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (c *Config) Print(w io.Writer) error {
}

func (c Config) EnableCustomConfig() error {
if err := runCommand(fmt.Sprintf("touch %s", c.customConfigFilePath)); err != nil {
if err := RunCommand(fmt.Sprintf("touch %s", c.customConfigFilePath)); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func setDirOwnership() error {
return err
}

func runCommand(cmdStr string) error {
func RunCommand(cmdStr string) error {
pgUser, err := user.Lookup("postgres")
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/flypg/pgbouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func (p *PGBouncer) ConfigurePrimary(ctx context.Context, primary string, reload

func (p *PGBouncer) initialize() error {
cmdStr := fmt.Sprintf("mkdir -p %s", p.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := RunCommand(cmdStr); err != nil {
return err
}

// If pgbouncer.ini file is not present, set defaults.
if _, err := os.Stat(fmt.Sprintf("%s/pgbouncer.ini", p.ConfigPath)); err != nil {
if os.IsNotExist(err) {
cmdStr := fmt.Sprintf("cp /fly/pgbouncer.ini %s", p.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := RunCommand(cmdStr); err != nil {
return err
}
} else {
Expand Down
39 changes: 33 additions & 6 deletions pkg/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (r *RepMgr) CurrentRole(ctx context.Context, pg *pgx.Conn) (string, error)
return r.memberRole(ctx, pg, int(r.ID))
}

func (r *RepMgr) Standbys(ctx context.Context, pg *pgx.Conn) ([]Standby, error) {
return r.standbyStatuses(ctx, pg, int(r.ID))
}

func (r *RepMgr) writeManagerConf() error {
file, err := os.OpenFile(r.ConfigPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
Expand Down Expand Up @@ -110,7 +114,7 @@ func (r *RepMgr) writeManagerConf() error {

func (r *RepMgr) registerPrimary() error {
cmdStr := fmt.Sprintf("repmgr -f %s primary register -F -v", r.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := RunCommand(cmdStr); err != nil {
return err
}

Expand All @@ -119,7 +123,7 @@ func (r *RepMgr) registerPrimary() error {

func (r *RepMgr) unregisterPrimary() error {
cmdStr := fmt.Sprintf("repmgr -f %s primary unregister", r.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := RunCommand(cmdStr); err != nil {
return err
}

Expand All @@ -128,7 +132,7 @@ func (r *RepMgr) unregisterPrimary() error {

func (r *RepMgr) followPrimary() error {
cmdStr := fmt.Sprintf("repmgr -f %s standby follow", r.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := RunCommand(cmdStr); err != nil {
fmt.Printf("failed to register standby: %s", err)
}

Expand All @@ -138,7 +142,7 @@ func (r *RepMgr) followPrimary() error {
func (r *RepMgr) registerStandby() error {
// Force re-registry to ensure the standby picks up any new configuration changes.
cmdStr := fmt.Sprintf("repmgr -f %s standby register -F", r.ConfigPath)
if err := runCommand(cmdStr); err != nil {
if err := RunCommand(cmdStr); err != nil {
fmt.Printf("failed to register standby: %s", err)
}

Expand All @@ -147,7 +151,7 @@ func (r *RepMgr) registerStandby() error {

func (r *RepMgr) clonePrimary(ipStr string) error {
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
if err := runCommand(cmdStr); err != nil {
if err := RunCommand(cmdStr); err != nil {
return err
}

Expand All @@ -159,7 +163,7 @@ func (r *RepMgr) clonePrimary(ipStr string) error {
r.ConfigPath)

fmt.Println(cmdStr)
return runCommand(cmdStr)
return RunCommand(cmdStr)
}

func (r *RepMgr) writePasswdConf() error {
Expand All @@ -184,6 +188,29 @@ func (r *RepMgr) writePasswdConf() error {
return nil
}

type Standby struct {
Id int
Ip string
}

func (r *RepMgr) standbyStatuses(ctx context.Context, pg *pgx.Conn, id int) ([]Standby, error) {
sql := fmt.Sprintf("select node_id, node_name from repmgr.show_nodes where type = 'standby' and upstream_node_id = '%d';", id)
var standbys []Standby
rows, err := pg.Query(ctx, sql)
if err != nil {
return nil, err
}
for rows.Next() {
var s Standby
err := rows.Scan(&s.Id, &s.Ip)
if err != nil {
return nil, err
}
standbys = append(standbys, s)
}
return standbys, nil
}

func (r *RepMgr) memberRole(ctx context.Context, pg *pgx.Conn, id int) (string, error) {
sql := fmt.Sprintf("select n.type from repmgr.nodes n LEFT JOIN repmgr.nodes un ON un.node_id = n.upstream_node_id WHERE n.node_id = '%d';", id)
var role string
Expand Down

0 comments on commit 53ad4ae

Please sign in to comment.