Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch node names to machine ID #251

Merged
merged 21 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion bin/restart-repmgrd
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
#!/bin/bash

kill `cat /tmp/repmgrd.pid`
if [ -f /tmp/repmgrd.pid ]; then
PID=$(cat /tmp/repmgrd.pid)

# Check if the process is running
if ps -p $PID > /dev/null 2>&1; then
kill $PID
fi
fi
11 changes: 10 additions & 1 deletion cmd/pg_unregister/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/base64"
"errors"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -44,7 +45,15 @@ func processUnregistration(ctx context.Context) error {
defer func() { _ = conn.Close(ctx) }()

member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes))
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
// for historical reasons, old versions of flyctl passes in the 6pn as the hostname
// most likely this won't work because the hostname does not resolve if the machine is stopped,
// but we try anyway
member, err = node.RepMgr.MemberBy6PN(ctx, conn, string(hostnameBytes))
if err != nil {
return fmt.Errorf("failed to resolve member by hostname and 6pn: %s", err)
}
} else if err != nil {
return fmt.Errorf("failed to resolve member: %s", err)
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ require (
github.com/hashicorp/consul/api v1.18.0
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgx/v5 v5.5.4
github.com/olekukonko/tablewriter v0.0.5
github.com/pkg/errors v0.9.1
github.com/pkg/term v1.1.0
github.com/spf13/cobra v1.8.1
github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2
golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3
golang.org/x/sync v0.1.0
Expand All @@ -36,8 +38,6 @@ require (
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
Expand Down
65 changes: 62 additions & 3 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/fly-apps/postgres-flex/internal/privnet"
"github.com/fly-apps/postgres-flex/internal/utils"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/slices"
)

type Node struct {
AppName string
MachineID string
PrivateIP string
PrimaryRegion string
DataDir string
Expand Down Expand Up @@ -52,6 +54,8 @@ func NewNode() (*Node, error) {

node.PrivateIP = ipv6.String()

node.MachineID = os.Getenv("FLY_MACHINE_ID")

node.PrimaryRegion = os.Getenv("PRIMARY_REGION")
if node.PrimaryRegion == "" {
return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set")
Expand Down Expand Up @@ -89,6 +93,7 @@ func NewNode() (*Node, error) {
PasswordConfigPath: "/data/.pgpass",
DataDir: node.DataDir,
PrivateIP: node.PrivateIP,
MachineID: node.MachineID,
Port: 5433,
DatabaseName: "repmgr",
Credentials: node.ReplCredentials,
Expand Down Expand Up @@ -265,7 +270,7 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to resolve member role: %s", err)
}

// Restart repmgrd in the event the IP changes for an already registered node.
// Restart repmgrd in the event the machine ID changes for an already registered node.
// This can happen if the underlying volume is moved to a different node.
daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)

Expand All @@ -279,6 +284,8 @@ func (n *Node) PostInit(ctx context.Context) error {
if err := Quarantine(ctx, n, primary); err != nil {
return fmt.Errorf("failed to quarantine failed primary: %s", err)
}

panic(err)
} else if errors.Is(err, ErrZombieDiscovered) {
log.Printf("[ERROR] The majority of registered members agree that '%s' is the real primary.\n", primary)
// Turn member read-only
Expand All @@ -292,10 +299,10 @@ func (n *Node) PostInit(ctx context.Context) error {
}

// This should never happen
if primary != n.PrivateIP {
if primary != n.RepMgr.machineIdToDNS(n.MachineID) {
return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen",
primary,
n.PrivateIP,
n.RepMgr.machineIdToDNS(n.MachineID),
)
}

Expand All @@ -311,6 +318,11 @@ func (n *Node) PostInit(ctx context.Context) error {
}
}
case StandbyRoleName:
if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil {
log.Printf("[ERROR] failed to migrate node name: %s", err)
// We try to bring the standby up anyway
}

// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
return fmt.Errorf("failed to register existing standby: %s", err)
Expand Down Expand Up @@ -527,3 +539,50 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro

return nil
}

// migrate node name from 6pn to machine ID if needed
func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error {
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
}

primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname)
if err != nil {
return fmt.Errorf("failed to establish connection to primary: %s", err)
}
defer func() { _ = primaryConn.Close(ctx) }()

rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
if err != nil {
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
}
defer rows.Close()

var applicationNames []string
for rows.Next() {
var applicationName string
if err := rows.Scan(&applicationName); err != nil {
return fmt.Errorf("failed to scan application_name: %s", err)
}
applicationNames = append(applicationNames, applicationName)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %s", err)
}

// if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
if slices.Contains(applicationNames, n.PrivateIP) {
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")

if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
return fmt.Errorf("failed to clone standby: %s", err)
}

if err := admin.ReloadPostgresConfig(ctx, repConn); err != nil {
return fmt.Errorf("failed to reload postgresql: %s", err)
}
}

return nil
}
4 changes: 2 additions & 2 deletions internal/flypg/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {

for _, member := range members {
if member.Role == PrimaryRoleName {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target)
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err)
Expand All @@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {
}

for _, member := range members {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint)
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err)
Expand Down
91 changes: 81 additions & 10 deletions internal/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type RepMgr struct {
PrimaryRegion string
Region string
PrivateIP string
MachineID string
DataDir string
DatabaseName string
Credentials admin.Credential
Expand Down Expand Up @@ -161,10 +162,12 @@ func (r *RepMgr) setDefaults() error {
return err
}

hostname := r.machineIdToDNS(r.MachineID)

conf := ConfigMap{
"node_id": nodeID,
"node_name": fmt.Sprintf("'%s'", r.PrivateIP),
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", r.PrivateIP, r.Port, r.Credentials.Username, r.DatabaseName),
"node_name": fmt.Sprintf("'%s'", hostname),
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", hostname, r.Port, r.Credentials.Username, r.DatabaseName),
"data_directory": fmt.Sprintf("'%s'", r.DataDir),
"failover": "'automatic'",
"use_replication_slots": "yes",
Expand Down Expand Up @@ -276,7 +279,7 @@ func (*RepMgr) restartDaemon() error {
}

func (r *RepMgr) daemonRestartRequired(m *Member) bool {
return m.Hostname != r.PrivateIP
return m.Hostname != r.MachineID
}

func (r *RepMgr) unregisterWitness(id int) error {
Expand All @@ -301,14 +304,14 @@ func (r *RepMgr) rejoinCluster(hostname string) error {
return err
}

func (r *RepMgr) clonePrimary(ipStr string) error {
func (r *RepMgr) clonePrimary(hostname string) error {
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
return fmt.Errorf("failed to create pg directory: %s", err)
}

cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -c -F",
ipStr,
hostname,
r.Port,
r.DatabaseName,
r.Credentials.Username,
Expand All @@ -322,6 +325,21 @@ func (r *RepMgr) clonePrimary(ipStr string) error {
return nil
}

func (r *RepMgr) regenReplicationConf(ctx context.Context) error {
// TODO: do we need -c?
if _, err := utils.RunCmd(ctx, "postgres",
"repmgr", "--replication-conf-only",
"-h", "",
"-p", fmt.Sprint(r.Port),
"-d", r.DatabaseName,
"-U", r.Credentials.Username,
"-f", r.ConfigPath,
"standby", "clone", "-F"); err != nil {
return fmt.Errorf("failed to regenerate replication conf: %s", err)
}
return nil
}

type Member struct {
ID int
Hostname string
Expand Down Expand Up @@ -431,26 +449,56 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri
return &member, nil
}

// MemberBy6PN returns a member by its 6PN address.
func (r *RepMgr) MemberBy6PN(ctx context.Context, pg *pgx.Conn, ip string) (*Member, error) {
members, err := r.Members(ctx, pg)
if err != nil {
return nil, err
}

resolver := privnet.GetResolver()
var lastErr error
for _, member := range members {
ips, err := resolver.LookupIPAddr(ctx, member.Hostname)
if err != nil {
lastErr = err
continue
}

for _, addr := range ips {
if addr.IP.String() == ip {
return &member, nil
}
}
}

if lastErr != nil {
return nil, fmt.Errorf("no matches found for %s, and error encountered: %s", ip, lastErr)
}

return nil, nil
}

func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) {
ips, err := r.InRegionPeerIPs(ctx)
machineIds, err := r.InRegionPeerMachines(ctx)
if err != nil {
return nil, err
}

var target *Member

for _, ip := range ips {
if ip.String() == r.PrivateIP {
for _, machineId := range machineIds {
if machineId == r.MachineID {
continue
}

conn, err := r.NewRemoteConnection(ctx, ip.String())
conn, err := r.NewRemoteConnection(ctx, r.machineIdToDNS(machineId))
if err != nil {
continue
}
defer func() { _ = conn.Close(ctx) }()

member, err := r.MemberByHostname(ctx, conn, ip.String())
member, err := r.MemberByHostname(ctx, conn, r.machineIdToDNS(machineId))
if err != nil {
continue
}
Expand All @@ -477,6 +525,21 @@ func (r *RepMgr) InRegionPeerIPs(ctx context.Context) ([]net.IPAddr, error) {
return privnet.AllPeers(ctx, targets)
}

func (r *RepMgr) InRegionPeerMachines(ctx context.Context) ([]string, error) {
machines, err := privnet.AllMachines(ctx, r.AppName)
if err != nil {
return nil, err
}

var machineIDs []string
for _, machine := range machines {
if machine.Region == r.PrimaryRegion {
machineIDs = append(machineIDs, machine.Id)
}
}
return machineIDs, nil
}

func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error) {
ips, err := r.InRegionPeerIPs(ctx)
if err != nil {
Expand Down Expand Up @@ -514,3 +577,11 @@ func (r *RepMgr) UnregisterMember(member Member) error {
func (r *RepMgr) eligiblePrimary() bool {
return r.Region == r.PrimaryRegion
}

func (r *RepMgr) machineIdToDNS(nodeName string) string {
if len(nodeName) != 14 {
panic("invalid machine id")
}

return fmt.Sprintf("%s.vm.%s.internal", nodeName, r.AppName)
}
6 changes: 4 additions & 2 deletions internal/flypg/repmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestRepmgrInitialization(t *testing.T) {
UserConfigPath: repgmrUserConfigFilePath,
PasswordConfigPath: repgmrPasswordConfigFilePath,
DataDir: repmgrTestDirectory,
MachineID: "abcdefg1234567",
PrivateIP: "127.0.0.1",
Credentials: admin.Credential{
Username: "user",
Expand Down Expand Up @@ -91,8 +92,8 @@ func TestRepmgrInitialization(t *testing.T) {
t.Fatal(err)
}

if config["node_name"] != "'127.0.0.1'" {
t.Fatalf("expected node_name to be '127.0.0.1', got %v", config["node_name"])
if config["node_name"] != "'abcdefg1234567.vm.test-app.internal'" {
t.Fatalf("expected node_name to be 'abcdefg1234567.vm.test-app.internal', got %v", config["node_name"])
}

if config["location"] != "'dev'" {
Expand Down Expand Up @@ -122,6 +123,7 @@ func TestRepmgrNodeIDGeneration(t *testing.T) {

DataDir: repmgrTestDirectory,
PrivateIP: "127.0.0.1",
MachineID: "abcdefg1234567",
Port: 5433,
DatabaseName: "repmgr",
Credentials: admin.Credential{
Expand Down
Loading
Loading