Skip to content

Commit

Permalink
Merge pull request #67 from fly-apps/runtime-zombie-analysis-2
Browse files Browse the repository at this point in the history
Evaluate quorum at runtime
  • Loading branch information
davissp14 authored Feb 6, 2023
2 parents cbe1103 + c158f2f commit 4d211f4
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 38 deletions.
83 changes: 83 additions & 0 deletions cmd/event_handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
Expand All @@ -10,6 +11,7 @@ import (
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
"github.com/jackc/pgx/v5"
)

const eventLogFile = "/data/event.log"
Expand All @@ -26,6 +28,8 @@ func main() {
details := flag.String("details", "", "details")
flag.Parse()

ctx := context.Background()

logFile, err := os.OpenFile(eventLogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
fmt.Printf("failed to open event log: %s", err)
Expand Down Expand Up @@ -91,6 +95,38 @@ func main() {
os.Exit(1)
}

case "child_node_disconnect", "child_node_reconnect", "child_node_new_connect":
node, err := flypg.NewNode()
if err != nil {
log.Printf("failed to initialize node: %s", err)
os.Exit(1)
}

conn, err := node.RepMgr.NewLocalConnection(ctx)
if err != nil {
log.Printf("failed to open local connection: %s", err)
os.Exit(1)
}
defer conn.Close(ctx)

member, err := node.RepMgr.Member(ctx, conn)
if err != nil {
log.Printf("failed to resolve member: %s", err)
os.Exit(1)
}

if member.Role != flypg.PrimaryRoleName {
// We should never get here.
log.Println("skipping since we are not the primary")
os.Exit(0)
}

if err := evaluateClusterState(ctx, conn, node); err != nil {
log.Printf("failed to evaluate cluster state: %s", err)
os.Exit(0)
}

os.Exit(0)
default:
// noop
}
Expand Down Expand Up @@ -118,3 +154,50 @@ func reconfigurePGBouncer(id int) error {

return nil
}

func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error {
standbys, err := node.RepMgr.StandbyMembers(ctx, conn)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("failed to query standbys")
}
}

sample, err := flypg.TakeDNASample(ctx, node, standbys)
if err != nil {
return fmt.Errorf("failed to evaluate cluster data: %s", err)
}

log.Println(flypg.DNASampleString(sample))

primary, err := flypg.ZombieDiagnosis(sample)
if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) {
// Quarantine primary
if err := flypg.Quarantine(ctx, conn, node, primary); err != nil {
return fmt.Errorf("failed to quarantine failed primary: %s", err)
}

return fmt.Errorf("primary has been quarantined: %s", err)
} else if err != nil {
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
}

// Clear zombie lock if it exists
if flypg.ZombieLockExists() {
log.Println("Clearing zombie lock and enabling read/write")
if err := flypg.RemoveZombieLock(); err != nil {
return fmt.Errorf("failed to remove zombie lock: %s", err)
}

log.Println("Broadcasting readonly state change")
if err := flypg.BroadcastReadonlyChange(ctx, node, false); err != nil {
log.Printf("errors while disabling readonly: %s", err)
}
}

if err := node.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil {
return fmt.Errorf("failed to reconfigure pgbouncer primary %s", err)
}

return nil
}
40 changes: 10 additions & 30 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (n *Node) Init(ctx context.Context) error {

if ZombieLockExists() {
fmt.Println("Zombie lock detected!")
primaryStr, err := readZombieLock()
primaryStr, err := ReadZombieLock()
if err != nil {
return fmt.Errorf("failed to read zombie lock: %s", primaryStr)
}
Expand All @@ -172,7 +172,7 @@ func (n *Node) Init(ctx context.Context) error {
// Confirm that our rejoin target still identifies itself as the primary.
if primary.Hostname != ip.String() {
// Clear the zombie.lock file so we can attempt to re-resolve the correct primary.
if err := removeZombieLock(); err != nil {
if err := RemoveZombieLock(); err != nil {
return fmt.Errorf("failed to remove zombie lock: %s", err)
}

Expand All @@ -191,7 +191,7 @@ func (n *Node) Init(ctx context.Context) error {

// TODO - Wait for target cluster to register self as a standby.

if err := removeZombieLock(); err != nil {
if err := RemoveZombieLock(); err != nil {
return fmt.Errorf("failed to remove zombie lock: %s", err)
}

Expand Down Expand Up @@ -348,45 +348,25 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to resolve cluster metrics: %s", err)
}

printDNASample(sample)
fmt.Println(DNASampleString(sample))

// Evaluate whether we are a zombie or not.
primary, err := ZombieDiagnosis(sample)
if errors.Is(err, ErrZombieDiagnosisUndecided) {
fmt.Println("Unable to confirm that we are the true primary!")

fmt.Println("Writing zombie.lock file.")
if err := writeZombieLock(""); err != nil {
return fmt.Errorf("failed to set zombie lock: %s", err)
if err := Quarantine(ctx, conn, n, primary); err != nil {
return fmt.Errorf("failed to quarantine failed primary: %s", err)
}

fmt.Println("Turning all user-created databases readonly.")
if err := BroadcastReadonlyChange(ctx, n, true); err != nil {
return fmt.Errorf("failed to set read-only: %s", err)
}

// TODO - Add link to docs
fmt.Println("Please refer to following documentation for more information: <insert-doc-link-here>.")

} else if errors.Is(err, ErrZombieDiscovered) {
fmt.Println("Zombie primary discovered!")
fmt.Printf("The majority of registered members agree that '%s' is the real primary.\n", primary)

fmt.Printf("Reconfiguring PGBouncer to point to '%s'\n", primary)
if err := n.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil {
return fmt.Errorf("failed to reconfigure pgbouncer: %s", err)
}

fmt.Println("Writing zombie.lock file")
if err := writeZombieLock(primary); err != nil {
return fmt.Errorf("failed to set zombie lock: %s", err)
}

fmt.Println("Turning user-created databases read-only")
if err := BroadcastReadonlyChange(ctx, n, true); err != nil {
return fmt.Errorf("failed to set read-only: %s", err)
if err := Quarantine(ctx, conn, n, primary); err != nil {
return fmt.Errorf("failed to quarantine failed primary: %s", err)
}

// Issue panic to force a process restart so we can attempt to rejoin
// the the cluster we've diverged from.
panic(err)
} else if err != nil {
return fmt.Errorf("failed to run zombie diagnosis: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/flypg/pgbouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (p *PGBouncer) ConfigurePrimary(ctx context.Context, primary string, reload
if reload {
err = p.reloadConfig(ctx)
if err != nil {
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
return fmt.Errorf("failed to reconfigure pgbouncer primary: %s", err)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/flypg/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func changeReadOnlyState(ctx context.Context, n *Node, enable bool) error {

databases, err := admin.ListDatabases(ctx, conn)
if err != nil {
return err
return fmt.Errorf("failed to list database: %s", err)
}

var dbNames []string
Expand Down
2 changes: 1 addition & 1 deletion internal/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *RepMgr) setDefaults() {
"promote_command": fmt.Sprintf("'repmgr standby promote -f %s --log-to-file'", r.ConfigPath),
"follow_command": fmt.Sprintf("'repmgr standby follow -f %s --log-to-file --upstream-node-id=%%n'", r.ConfigPath),
"event_notification_command": fmt.Sprintf("'/usr/local/bin/event_handler -node-id %%n -event %%e -success %%s -details \"%%d\" -new-node-id \\'%%p\\''"),
"event_notifications": "'repmgrd_failover_promote,standby_promote,standby_follow'",
"event_notifications": "'repmgrd_failover_promote,standby_promote,standby_follow,child_node_disconnect,child_node_reconnect,child_node_new_connect'",
"location": r.Region,
"primary_visibility_consensus": true,
"failover_validation_command": fmt.Sprintf("'/usr/local/bin/failover_validation -visible-nodes %%v -total-nodes %%t'"),
Expand Down
2 changes: 1 addition & 1 deletion internal/flypg/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func clearLocks() error {
}
}

if err := removeZombieLock(); err != nil {
if err := RemoveZombieLock(); err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("failed to remove zombie lock pre-restore: %s", err)
}
Expand Down
29 changes: 25 additions & 4 deletions internal/flypg/zombie.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"os"

"github.com/jackc/pgx/v5"
)

var (
Expand Down Expand Up @@ -35,15 +37,15 @@ func writeZombieLock(hostname string) error {
return nil
}

func removeZombieLock() error {
func RemoveZombieLock() error {
if err := os.Remove("/data/zombie.lock"); err != nil {
return err
}

return nil
}

func readZombieLock() (string, error) {
func ReadZombieLock() (string, error) {
body, err := os.ReadFile("/data/zombie.lock")
if err != nil {
return "", err
Expand Down Expand Up @@ -142,8 +144,27 @@ func ZombieDiagnosis(s *DNASample) (string, error) {
return "", ErrZombieDiagnosisUndecided
}

func printDNASample(s *DNASample) {
fmt.Printf("Registered members: %d, Active member(s): %d, Inactive member(s): %d, Conflicts detected: %d\n",
func Quarantine(ctx context.Context, conn *pgx.Conn, n *Node, primary string) error {
if primary != "" {
if err := n.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil {
return fmt.Errorf("failed to reconfigure pgbouncer: %s", err)
}
}

fmt.Println("Writing zombie.lock file.")
if err := writeZombieLock(""); err != nil {
return fmt.Errorf("failed to set zombie lock: %s", err)
}

if err := BroadcastReadonlyChange(ctx, n, true); err != nil {
return fmt.Errorf("failed to set read-only: %s", err)
}

return nil
}

func DNASampleString(s *DNASample) string {
return fmt.Sprintf("Registered members: %d, Active member(s): %d, Inactive member(s): %d, Conflicts detected: %d",
s.totalMembers,
s.totalActive,
s.totalInactive,
Expand Down

0 comments on commit 4d211f4

Please sign in to comment.