From a39f76f861d75cf86389bef79b7cab9dc5d9a6dd Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Tue, 13 Aug 2024 10:59:58 -0400 Subject: [PATCH] switch node names to machine ID (#251) * WIP switch node names to machine ID * fix replica clone * updates * fixes * TODO * fix application_name on replica startup * move to function * reuse code * upgrade primary * clean up the diff * fix tests * silence warning * missed a few * fix deepsource callout * make restart-repmgrd more resilient * make pg_unregister work with new names * Accept migration failures * add missing panic * update pg_unregister comment * remove old comment --- bin/restart-repmgrd | 9 +++- cmd/pg_unregister/main.go | 11 ++++- go.mod | 4 +- internal/flypg/node.go | 65 +++++++++++++++++++++++-- internal/flypg/readonly.go | 4 +- internal/flypg/repmgr.go | 91 +++++++++++++++++++++++++++++++---- internal/flypg/repmgr_test.go | 6 ++- internal/flypg/zombie.go | 19 +++----- internal/privnet/sixpn.go | 58 ++++++++++++++++------ 9 files changed, 220 insertions(+), 47 deletions(-) diff --git a/bin/restart-repmgrd b/bin/restart-repmgrd index 10ef6bef..9b3d4db0 100755 --- a/bin/restart-repmgrd +++ b/bin/restart-repmgrd @@ -1,3 +1,10 @@ #!/bin/bash -kill `cat /tmp/repmgrd.pid` +if [ -f /tmp/repmgrd.pid ]; then + PID=$(cat /tmp/repmgrd.pid) + + # Check if the process is running + if ps -p $PID > /dev/null 2>&1; then + kill $PID + fi +fi \ No newline at end of file diff --git a/cmd/pg_unregister/main.go b/cmd/pg_unregister/main.go index c1ac9517..049ab09a 100644 --- a/cmd/pg_unregister/main.go +++ b/cmd/pg_unregister/main.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/base64" + "errors" "fmt" "log" "os" @@ -44,7 +45,15 @@ func processUnregistration(ctx context.Context) error { defer func() { _ = conn.Close(ctx) }() member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes)) - if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + // for historical reasons, old versions of flyctl passes in the 6pn as the hostname + // most likely this won't work because the hostname does not resolve if the machine is stopped, + // but we try anyway + member, err = node.RepMgr.MemberBy6PN(ctx, conn, string(hostnameBytes)) + if err != nil { + return fmt.Errorf("failed to resolve member by hostname and 6pn: %s", err) + } + } else if err != nil { return fmt.Errorf("failed to resolve member: %s", err) } diff --git a/go.mod b/go.mod index b1b1e9fb..9cf3f9ad 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,10 @@ require ( github.com/hashicorp/consul/api v1.18.0 github.com/jackc/pgconn v1.14.3 github.com/jackc/pgx/v5 v5.5.4 + github.com/olekukonko/tablewriter v0.0.5 github.com/pkg/errors v0.9.1 github.com/pkg/term v1.1.0 + github.com/spf13/cobra v1.8.1 github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2 golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 golang.org/x/sync v0.1.0 @@ -36,8 +38,6 @@ require ( github.com/mattn/go-runewidth v0.0.9 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect - github.com/olekukonko/tablewriter v0.0.5 // indirect - github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.0 // indirect golang.org/x/crypto v0.20.0 // indirect diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 3b099218..28d832c0 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -16,10 +16,12 @@ import ( "github.com/fly-apps/postgres-flex/internal/privnet" "github.com/fly-apps/postgres-flex/internal/utils" "github.com/jackc/pgx/v5" + "golang.org/x/exp/slices" ) type Node struct { AppName string + MachineID string PrivateIP string PrimaryRegion string DataDir string @@ -52,6 +54,8 @@ func NewNode() (*Node, error) { node.PrivateIP = ipv6.String() + node.MachineID = os.Getenv("FLY_MACHINE_ID") + node.PrimaryRegion = os.Getenv("PRIMARY_REGION") if node.PrimaryRegion == "" { return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set") @@ -89,6 +93,7 @@ func NewNode() (*Node, error) { PasswordConfigPath: "/data/.pgpass", DataDir: node.DataDir, PrivateIP: node.PrivateIP, + MachineID: node.MachineID, Port: 5433, DatabaseName: "repmgr", Credentials: node.ReplCredentials, @@ -265,7 +270,7 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to resolve member role: %s", err) } - // Restart repmgrd in the event the IP changes for an already registered node. + // Restart repmgrd in the event the machine ID changes for an already registered node. // This can happen if the underlying volume is moved to a different node. daemonRestartRequired := n.RepMgr.daemonRestartRequired(member) @@ -279,6 +284,8 @@ func (n *Node) PostInit(ctx context.Context) error { if err := Quarantine(ctx, n, primary); err != nil { return fmt.Errorf("failed to quarantine failed primary: %s", err) } + + panic(err) } else if errors.Is(err, ErrZombieDiscovered) { log.Printf("[ERROR] The majority of registered members agree that '%s' is the real primary.\n", primary) // Turn member read-only @@ -292,10 +299,10 @@ func (n *Node) PostInit(ctx context.Context) error { } // This should never happen - if primary != n.PrivateIP { + if primary != n.RepMgr.machineIdToDNS(n.MachineID) { return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen", primary, - n.PrivateIP, + n.RepMgr.machineIdToDNS(n.MachineID), ) } @@ -311,6 +318,11 @@ func (n *Node) PostInit(ctx context.Context) error { } } case StandbyRoleName: + if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil { + log.Printf("[ERROR] failed to migrate node name: %s", err) + // We try to bring the standby up anyway + } + // Register existing standby to apply any configuration changes. if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil { return fmt.Errorf("failed to register existing standby: %s", err) @@ -527,3 +539,50 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro return nil } + +// migrate node name from 6pn to machine ID if needed +func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error { + primary, err := n.RepMgr.PrimaryMember(ctx, repConn) + if err != nil { + return fmt.Errorf("failed to resolve primary member when updating standby: %s", err) + } + + primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname) + if err != nil { + return fmt.Errorf("failed to establish connection to primary: %s", err) + } + defer func() { _ = primaryConn.Close(ctx) }() + + rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication") + if err != nil { + return fmt.Errorf("failed to query pg_stat_replication: %s", err) + } + defer rows.Close() + + var applicationNames []string + for rows.Next() { + var applicationName string + if err := rows.Scan(&applicationName); err != nil { + return fmt.Errorf("failed to scan application_name: %s", err) + } + applicationNames = append(applicationNames, applicationName) + } + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to iterate over rows: %s", err) + } + + // if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql + if slices.Contains(applicationNames, n.PrivateIP) { + log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...") + + if err := n.RepMgr.regenReplicationConf(ctx); err != nil { + return fmt.Errorf("failed to clone standby: %s", err) + } + + if err := admin.ReloadPostgresConfig(ctx, repConn); err != nil { + return fmt.Errorf("failed to reload postgresql: %s", err) + } + } + + return nil +} diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index a714c254..1c075538 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { for _, member := range members { if member.Role == PrimaryRoleName { - endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target) + endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target) resp, err := http.Get(endpoint) if err != nil { log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err) @@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { } for _, member := range members { - endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint) + endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint) resp, err := http.Get(endpoint) if err != nil { log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err) diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 46ceab8c..f6687169 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -34,6 +34,7 @@ type RepMgr struct { PrimaryRegion string Region string PrivateIP string + MachineID string DataDir string DatabaseName string Credentials admin.Credential @@ -161,10 +162,12 @@ func (r *RepMgr) setDefaults() error { return err } + hostname := r.machineIdToDNS(r.MachineID) + conf := ConfigMap{ "node_id": nodeID, - "node_name": fmt.Sprintf("'%s'", r.PrivateIP), - "conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", r.PrivateIP, r.Port, r.Credentials.Username, r.DatabaseName), + "node_name": fmt.Sprintf("'%s'", hostname), + "conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", hostname, r.Port, r.Credentials.Username, r.DatabaseName), "data_directory": fmt.Sprintf("'%s'", r.DataDir), "failover": "'automatic'", "use_replication_slots": "yes", @@ -276,7 +279,7 @@ func (*RepMgr) restartDaemon() error { } func (r *RepMgr) daemonRestartRequired(m *Member) bool { - return m.Hostname != r.PrivateIP + return m.Hostname != r.MachineID } func (r *RepMgr) unregisterWitness(id int) error { @@ -301,14 +304,14 @@ func (r *RepMgr) rejoinCluster(hostname string) error { return err } -func (r *RepMgr) clonePrimary(ipStr string) error { +func (r *RepMgr) clonePrimary(hostname string) error { cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir) if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil { return fmt.Errorf("failed to create pg directory: %s", err) } cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -c -F", - ipStr, + hostname, r.Port, r.DatabaseName, r.Credentials.Username, @@ -322,6 +325,21 @@ func (r *RepMgr) clonePrimary(ipStr string) error { return nil } +func (r *RepMgr) regenReplicationConf(ctx context.Context) error { + // TODO: do we need -c? + if _, err := utils.RunCmd(ctx, "postgres", + "repmgr", "--replication-conf-only", + "-h", "", + "-p", fmt.Sprint(r.Port), + "-d", r.DatabaseName, + "-U", r.Credentials.Username, + "-f", r.ConfigPath, + "standby", "clone", "-F"); err != nil { + return fmt.Errorf("failed to regenerate replication conf: %s", err) + } + return nil +} + type Member struct { ID int Hostname string @@ -431,26 +449,56 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri return &member, nil } +// MemberBy6PN returns a member by its 6PN address. +func (r *RepMgr) MemberBy6PN(ctx context.Context, pg *pgx.Conn, ip string) (*Member, error) { + members, err := r.Members(ctx, pg) + if err != nil { + return nil, err + } + + resolver := privnet.GetResolver() + var lastErr error + for _, member := range members { + ips, err := resolver.LookupIPAddr(ctx, member.Hostname) + if err != nil { + lastErr = err + continue + } + + for _, addr := range ips { + if addr.IP.String() == ip { + return &member, nil + } + } + } + + if lastErr != nil { + return nil, fmt.Errorf("no matches found for %s, and error encountered: %s", ip, lastErr) + } + + return nil, nil +} + func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) { - ips, err := r.InRegionPeerIPs(ctx) + machineIds, err := r.InRegionPeerMachines(ctx) if err != nil { return nil, err } var target *Member - for _, ip := range ips { - if ip.String() == r.PrivateIP { + for _, machineId := range machineIds { + if machineId == r.MachineID { continue } - conn, err := r.NewRemoteConnection(ctx, ip.String()) + conn, err := r.NewRemoteConnection(ctx, r.machineIdToDNS(machineId)) if err != nil { continue } defer func() { _ = conn.Close(ctx) }() - member, err := r.MemberByHostname(ctx, conn, ip.String()) + member, err := r.MemberByHostname(ctx, conn, r.machineIdToDNS(machineId)) if err != nil { continue } @@ -477,6 +525,21 @@ func (r *RepMgr) InRegionPeerIPs(ctx context.Context) ([]net.IPAddr, error) { return privnet.AllPeers(ctx, targets) } +func (r *RepMgr) InRegionPeerMachines(ctx context.Context) ([]string, error) { + machines, err := privnet.AllMachines(ctx, r.AppName) + if err != nil { + return nil, err + } + + var machineIDs []string + for _, machine := range machines { + if machine.Region == r.PrimaryRegion { + machineIDs = append(machineIDs, machine.Id) + } + } + return machineIDs, nil +} + func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error) { ips, err := r.InRegionPeerIPs(ctx) if err != nil { @@ -514,3 +577,11 @@ func (r *RepMgr) UnregisterMember(member Member) error { func (r *RepMgr) eligiblePrimary() bool { return r.Region == r.PrimaryRegion } + +func (r *RepMgr) machineIdToDNS(nodeName string) string { + if len(nodeName) != 14 { + panic("invalid machine id") + } + + return fmt.Sprintf("%s.vm.%s.internal", nodeName, r.AppName) +} diff --git a/internal/flypg/repmgr_test.go b/internal/flypg/repmgr_test.go index 8251c7d4..d5233805 100644 --- a/internal/flypg/repmgr_test.go +++ b/internal/flypg/repmgr_test.go @@ -33,6 +33,7 @@ func TestRepmgrInitialization(t *testing.T) { UserConfigPath: repgmrUserConfigFilePath, PasswordConfigPath: repgmrPasswordConfigFilePath, DataDir: repmgrTestDirectory, + MachineID: "abcdefg1234567", PrivateIP: "127.0.0.1", Credentials: admin.Credential{ Username: "user", @@ -91,8 +92,8 @@ func TestRepmgrInitialization(t *testing.T) { t.Fatal(err) } - if config["node_name"] != "'127.0.0.1'" { - t.Fatalf("expected node_name to be '127.0.0.1', got %v", config["node_name"]) + if config["node_name"] != "'abcdefg1234567.vm.test-app.internal'" { + t.Fatalf("expected node_name to be 'abcdefg1234567.vm.test-app.internal', got %v", config["node_name"]) } if config["location"] != "'dev'" { @@ -122,6 +123,7 @@ func TestRepmgrNodeIDGeneration(t *testing.T) { DataDir: repmgrTestDirectory, PrivateIP: "127.0.0.1", + MachineID: "abcdefg1234567", Port: 5433, DatabaseName: "repmgr", Credentials: admin.Credential{ diff --git a/internal/flypg/zombie.go b/internal/flypg/zombie.go index 7204846d..6380338e 100644 --- a/internal/flypg/zombie.go +++ b/internal/flypg/zombie.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "log" - "net" "os" "github.com/fly-apps/postgres-flex/internal/utils" @@ -85,7 +84,7 @@ type DNASample struct { func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASample, error) { sample := &DNASample{ - hostname: node.PrivateIP, + hostname: node.RepMgr.machineIdToDNS(node.MachineID), totalMembers: len(standbys) + 1, totalActive: 1, totalInactive: 0, @@ -118,7 +117,8 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp sample.totalActive++ // Record conflict when primary doesn't match. - if primary.Hostname != node.PrivateIP { + // We're checking PrivateIP here for backwards compatibility + if primary.Hostname != node.RepMgr.machineIdToDNS(node.MachineID) && primary.Hostname != node.PrivateIP { sample.totalConflicts++ sample.conflictMap[primary.Hostname]++ } @@ -199,24 +199,19 @@ func handleZombieLock(ctx context.Context, n *Node) error { // If the zombie lock contains a hostname, it means we were able to // resolve the real primary and will attempt to rejoin it. if primaryStr != "" { - ip := net.ParseIP(primaryStr) - if ip == nil { - return fmt.Errorf("zombie.lock file contains an invalid ipv6 address") - } - - conn, err := n.RepMgr.NewRemoteConnection(ctx, ip.String()) + conn, err := n.RepMgr.NewRemoteConnection(ctx, primaryStr) if err != nil { - return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", ip.String(), err) + return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", primaryStr, err) } defer func() { _ = conn.Close(ctx) }() primary, err := n.RepMgr.PrimaryMember(ctx, conn) if err != nil { - return fmt.Errorf("failed to confirm primary on recover target %s: %s", ip.String(), err) + return fmt.Errorf("failed to confirm primary on recover target %s: %s", primaryStr, err) } // Confirm that our rejoin target still identifies itself as the primary. - if primary.Hostname != ip.String() { + if primary.Hostname != primaryStr { // Clear the zombie.lock file so we can attempt to re-resolve the correct primary. if err := RemoveZombieLock(); err != nil { return fmt.Errorf("failed to remove zombie lock: %s", err) diff --git a/internal/privnet/sixpn.go b/internal/privnet/sixpn.go index 85b1eaeb..5232b9b8 100644 --- a/internal/privnet/sixpn.go +++ b/internal/privnet/sixpn.go @@ -14,20 +14,7 @@ func AllPeers(ctx context.Context, appName string) ([]net.IPAddr, error) { } func Get6PN(ctx context.Context, hostname string) ([]net.IPAddr, error) { - nameserver := os.Getenv("FLY_NAMESERVER") - if nameserver == "" { - nameserver = "fdaa::3" - } - nameserver = net.JoinHostPort(nameserver, "53") - r := &net.Resolver{ - PreferGo: true, - Dial: func(ctx context.Context, network, address string) (net.Conn, error) { - d := net.Dialer{ - Timeout: 1 * time.Second, - } - return d.DialContext(ctx, "udp6", nameserver) - }, - } + r := GetResolver() ips, err := r.LookupIPAddr(ctx, hostname) if err != nil { @@ -54,6 +41,49 @@ func Get6PN(ctx context.Context, hostname string) ([]net.IPAddr, error) { return ips, err } +type Machine struct { + Id string + Region string +} + +func AllMachines(ctx context.Context, appName string) ([]Machine, error) { + r := GetResolver() + txts, err := r.LookupTXT(ctx, fmt.Sprintf("vms.%s.internal", appName)) + if err != nil { + return nil, err + } + + machines := make([]Machine, 0) + for _, txt := range txts { + parts := strings.Split(txt, ",") + for _, part := range parts { + parts := strings.Split(part, " ") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid machine DNS TXT format: %s", txt) + } + machines = append(machines, Machine{Id: parts[0], Region: parts[1]}) + } + } + return machines, nil +} + +func GetResolver() *net.Resolver { + nameserver := os.Getenv("FLY_NAMESERVER") + if nameserver == "" { + nameserver = "fdaa::3" + } + nameserver = net.JoinHostPort(nameserver, "53") + return &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, _, _ string) (net.Conn, error) { + d := net.Dialer{ + Timeout: 1 * time.Second, + } + return d.DialContext(ctx, "udp6", nameserver) + }, + } +} + func PrivateIPv6() (net.IP, error) { ips, err := net.LookupIP("fly-local-6pn") if err != nil && !strings.HasSuffix(err.Error(), "no such host") && !strings.HasSuffix(err.Error(), "server misbehaving") {