diff --git a/cmd/event_handler/main.go b/cmd/event_handler/main.go index eab76a80..49d25909 100644 --- a/cmd/event_handler/main.go +++ b/cmd/event_handler/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "flag" "fmt" "log" @@ -10,6 +11,7 @@ import ( "time" "github.com/fly-apps/postgres-flex/internal/flypg" + "github.com/jackc/pgx/v5" ) const eventLogFile = "/data/event.log" @@ -26,6 +28,8 @@ func main() { details := flag.String("details", "", "details") flag.Parse() + ctx := context.Background() + logFile, err := os.OpenFile(eventLogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) if err != nil { fmt.Printf("failed to open event log: %s", err) @@ -91,6 +95,38 @@ func main() { os.Exit(1) } + case "child_node_disconnect", "child_node_reconnect", "child_node_new_connect": + node, err := flypg.NewNode() + if err != nil { + log.Printf("failed to initialize node: %s", err) + os.Exit(1) + } + + conn, err := node.RepMgr.NewLocalConnection(ctx) + if err != nil { + log.Printf("failed to open local connection: %s", err) + os.Exit(1) + } + defer conn.Close(ctx) + + member, err := node.RepMgr.Member(ctx, conn) + if err != nil { + log.Printf("failed to resolve member: %s", err) + os.Exit(1) + } + + if member.Role != flypg.PrimaryRoleName { + // We should never get here. + log.Println("skipping since we are not the primary") + os.Exit(0) + } + + if err := evaluateClusterState(ctx, conn, node); err != nil { + log.Printf("failed to evaluate cluster state: %s", err) + os.Exit(0) + } + + os.Exit(0) default: // noop } @@ -118,3 +154,50 @@ func reconfigurePGBouncer(id int) error { return nil } + +func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error { + standbys, err := node.RepMgr.StandbyMembers(ctx, conn) + if err != nil { + if !errors.Is(err, pgx.ErrNoRows) { + return fmt.Errorf("failed to query standbys") + } + } + + sample, err := flypg.TakeDNASample(ctx, node, standbys) + if err != nil { + return fmt.Errorf("failed to evaluate cluster data: %s", err) + } + + log.Println(flypg.DNASampleString(sample)) + + primary, err := flypg.ZombieDiagnosis(sample) + if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) { + // Quarantine primary + if err := flypg.Quarantine(ctx, conn, node, primary); err != nil { + return fmt.Errorf("failed to quarantine failed primary: %s", err) + } + + return fmt.Errorf("primary has been quarantined: %s", err) + } else if err != nil { + return fmt.Errorf("failed to run zombie diagnosis: %s", err) + } + + // Clear zombie lock if it exists + if flypg.ZombieLockExists() { + log.Println("Clearing zombie lock and enabling read/write") + if err := flypg.RemoveZombieLock(); err != nil { + return fmt.Errorf("failed to remove zombie lock: %s", err) + } + + log.Println("Broadcasting readonly state change") + if err := flypg.BroadcastReadonlyChange(ctx, node, false); err != nil { + log.Printf("errors while disabling readonly: %s", err) + } + } + + if err := node.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil { + return fmt.Errorf("failed to reconfigure pgbouncer primary %s", err) + } + + return nil +} diff --git a/internal/flypg/node.go b/internal/flypg/node.go index a8b3b8a4..848e880a 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -145,7 +145,7 @@ func (n *Node) Init(ctx context.Context) error { if ZombieLockExists() { fmt.Println("Zombie lock detected!") - primaryStr, err := readZombieLock() + primaryStr, err := ReadZombieLock() if err != nil { return fmt.Errorf("failed to read zombie lock: %s", primaryStr) } @@ -172,7 +172,7 @@ func (n *Node) Init(ctx context.Context) error { // Confirm that our rejoin target still identifies itself as the primary. if primary.Hostname != ip.String() { // Clear the zombie.lock file so we can attempt to re-resolve the correct primary. - if err := removeZombieLock(); err != nil { + if err := RemoveZombieLock(); err != nil { return fmt.Errorf("failed to remove zombie lock: %s", err) } @@ -191,7 +191,7 @@ func (n *Node) Init(ctx context.Context) error { // TODO - Wait for target cluster to register self as a standby. - if err := removeZombieLock(); err != nil { + if err := RemoveZombieLock(); err != nil { return fmt.Errorf("failed to remove zombie lock: %s", err) } @@ -348,45 +348,25 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to resolve cluster metrics: %s", err) } - printDNASample(sample) + fmt.Println(DNASampleString(sample)) // Evaluate whether we are a zombie or not. primary, err := ZombieDiagnosis(sample) if errors.Is(err, ErrZombieDiagnosisUndecided) { fmt.Println("Unable to confirm that we are the true primary!") - fmt.Println("Writing zombie.lock file.") - if err := writeZombieLock(""); err != nil { - return fmt.Errorf("failed to set zombie lock: %s", err) + if err := Quarantine(ctx, conn, n, primary); err != nil { + return fmt.Errorf("failed to quarantine failed primary: %s", err) } - fmt.Println("Turning all user-created databases readonly.") - if err := BroadcastReadonlyChange(ctx, n, true); err != nil { - return fmt.Errorf("failed to set read-only: %s", err) - } - - // TODO - Add link to docs - fmt.Println("Please refer to following documentation for more information: .") - } else if errors.Is(err, ErrZombieDiscovered) { - fmt.Println("Zombie primary discovered!") fmt.Printf("The majority of registered members agree that '%s' is the real primary.\n", primary) - fmt.Printf("Reconfiguring PGBouncer to point to '%s'\n", primary) - if err := n.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil { - return fmt.Errorf("failed to reconfigure pgbouncer: %s", err) - } - - fmt.Println("Writing zombie.lock file") - if err := writeZombieLock(primary); err != nil { - return fmt.Errorf("failed to set zombie lock: %s", err) - } - - fmt.Println("Turning user-created databases read-only") - if err := BroadcastReadonlyChange(ctx, n, true); err != nil { - return fmt.Errorf("failed to set read-only: %s", err) + if err := Quarantine(ctx, conn, n, primary); err != nil { + return fmt.Errorf("failed to quarantine failed primary: %s", err) } - + // Issue panic to force a process restart so we can attempt to rejoin + // the the cluster we've diverged from. panic(err) } else if err != nil { return fmt.Errorf("failed to run zombie diagnosis: %s", err) diff --git a/internal/flypg/pgbouncer.go b/internal/flypg/pgbouncer.go index 383580b5..238e04fb 100644 --- a/internal/flypg/pgbouncer.go +++ b/internal/flypg/pgbouncer.go @@ -62,7 +62,7 @@ func (p *PGBouncer) ConfigurePrimary(ctx context.Context, primary string, reload if reload { err = p.reloadConfig(ctx) if err != nil { - fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err) + return fmt.Errorf("failed to reconfigure pgbouncer primary: %s", err) } } return nil diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index 580992fd..a43ad711 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -124,7 +124,7 @@ func changeReadOnlyState(ctx context.Context, n *Node, enable bool) error { databases, err := admin.ListDatabases(ctx, conn) if err != nil { - return err + return fmt.Errorf("failed to list database: %s", err) } var dbNames []string diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index e1ddc972..73526d04 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -123,7 +123,7 @@ func (r *RepMgr) setDefaults() { "promote_command": fmt.Sprintf("'repmgr standby promote -f %s --log-to-file'", r.ConfigPath), "follow_command": fmt.Sprintf("'repmgr standby follow -f %s --log-to-file --upstream-node-id=%%n'", r.ConfigPath), "event_notification_command": fmt.Sprintf("'/usr/local/bin/event_handler -node-id %%n -event %%e -success %%s -details \"%%d\" -new-node-id \\'%%p\\''"), - "event_notifications": "'repmgrd_failover_promote,standby_promote,standby_follow'", + "event_notifications": "'repmgrd_failover_promote,standby_promote,standby_follow,child_node_disconnect,child_node_reconnect,child_node_new_connect'", "location": r.Region, "primary_visibility_consensus": true, "failover_validation_command": fmt.Sprintf("'/usr/local/bin/failover_validation -visible-nodes %%v -total-nodes %%t'"), diff --git a/internal/flypg/restore.go b/internal/flypg/restore.go index 0872aa7a..ef13d691 100644 --- a/internal/flypg/restore.go +++ b/internal/flypg/restore.go @@ -210,7 +210,7 @@ func clearLocks() error { } } - if err := removeZombieLock(); err != nil { + if err := RemoveZombieLock(); err != nil { if !os.IsNotExist(err) { return fmt.Errorf("failed to remove zombie lock pre-restore: %s", err) } diff --git a/internal/flypg/zombie.go b/internal/flypg/zombie.go index fbc5a3c6..6a240cc4 100644 --- a/internal/flypg/zombie.go +++ b/internal/flypg/zombie.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "os" + + "github.com/jackc/pgx/v5" ) var ( @@ -35,7 +37,7 @@ func writeZombieLock(hostname string) error { return nil } -func removeZombieLock() error { +func RemoveZombieLock() error { if err := os.Remove("/data/zombie.lock"); err != nil { return err } @@ -43,7 +45,7 @@ func removeZombieLock() error { return nil } -func readZombieLock() (string, error) { +func ReadZombieLock() (string, error) { body, err := os.ReadFile("/data/zombie.lock") if err != nil { return "", err @@ -142,8 +144,27 @@ func ZombieDiagnosis(s *DNASample) (string, error) { return "", ErrZombieDiagnosisUndecided } -func printDNASample(s *DNASample) { - fmt.Printf("Registered members: %d, Active member(s): %d, Inactive member(s): %d, Conflicts detected: %d\n", +func Quarantine(ctx context.Context, conn *pgx.Conn, n *Node, primary string) error { + if primary != "" { + if err := n.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil { + return fmt.Errorf("failed to reconfigure pgbouncer: %s", err) + } + } + + fmt.Println("Writing zombie.lock file.") + if err := writeZombieLock(""); err != nil { + return fmt.Errorf("failed to set zombie lock: %s", err) + } + + if err := BroadcastReadonlyChange(ctx, n, true); err != nil { + return fmt.Errorf("failed to set read-only: %s", err) + } + + return nil +} + +func DNASampleString(s *DNASample) string { + return fmt.Sprintf("Registered members: %d, Active member(s): %d, Inactive member(s): %d, Conflicts detected: %d", s.totalMembers, s.totalActive, s.totalInactive,