Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch node names to machine ID #251

Merged
merged 21 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/monitor/monitor_dead_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int
}

for _, voter := range votingMembers {
sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.Hostname)
sConn, err := node.RepMgr.NewRemoteConnection(ctx, voter.NodeName)
if err != nil {
// TODO - Verify the exception that's getting thrown.
if time.Since(seenAt[voter.ID]) >= deadMemberRemovalThreshold {
log.Printf("Removing dead member: %s\n", voter.Hostname)
log.Printf("Removing dead member: %s\n", voter.NodeName)
if err := node.RepMgr.UnregisterMember(voter); err != nil {
log.Printf("failed to unregister member %s: %v", voter.Hostname, err)
log.Printf("failed to unregister member %s: %v", voter.NodeName, err)
continue
}
delete(seenAt, voter.ID)
Expand Down
66 changes: 62 additions & 4 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/fly-apps/postgres-flex/internal/privnet"
"github.com/fly-apps/postgres-flex/internal/utils"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/slices"
)

type Node struct {
AppName string
MachineID string
PrivateIP string
PrimaryRegion string
DataDir string
Expand Down Expand Up @@ -52,6 +54,8 @@ func NewNode() (*Node, error) {

node.PrivateIP = ipv6.String()

node.MachineID = os.Getenv("FLY_MACHINE_ID")

node.PrimaryRegion = os.Getenv("PRIMARY_REGION")
if node.PrimaryRegion == "" {
return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set")
Expand Down Expand Up @@ -89,6 +93,7 @@ func NewNode() (*Node, error) {
PasswordConfigPath: "/data/.pgpass",
DataDir: node.DataDir,
PrivateIP: node.PrivateIP,
MachineID: node.MachineID,
Port: 5433,
DatabaseName: "repmgr",
Credentials: node.ReplCredentials,
Expand Down Expand Up @@ -187,7 +192,7 @@ func (n *Node) Init(ctx context.Context) error {
return fmt.Errorf("failed to resolve member over dns: %s", err)
}

if err := n.RepMgr.clonePrimary(cloneTarget.Hostname); err != nil {
if err := n.RepMgr.clonePrimary(cloneTarget.NodeName); err != nil {
// Clean-up the directory so it can be retried.
if rErr := os.Remove(n.DataDir); rErr != nil {
log.Printf("[ERROR] failed to cleanup postgresql dir after clone error: %s\n", rErr)
Expand Down Expand Up @@ -267,7 +272,9 @@ func (n *Node) PostInit(ctx context.Context) error {

// Restart repmgrd in the event the IP changes for an already registered node.
// This can happen if the underlying volume is moved to a different node.
daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)
// TODO - this isn't an IP anymore
//daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)
daemonRestartRequired := false

switch member.Role {
case PrimaryRoleName:
Expand Down Expand Up @@ -311,6 +318,10 @@ func (n *Node) PostInit(ctx context.Context) error {
}
}
case StandbyRoleName:
if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil {
return fmt.Errorf("failed to migrate node name: %s", err)
}

// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
return fmt.Errorf("failed to register existing standby: %s", err)
Expand All @@ -322,7 +333,7 @@ func (n *Node) PostInit(ctx context.Context) error {
}

// Register existing witness to apply any configuration changes.
if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
if err := n.RepMgr.registerWitness(primary.NodeName); err != nil {
return fmt.Errorf("failed to register existing witness: %s", err)
}
default:
Expand Down Expand Up @@ -404,7 +415,7 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to resolve primary member: %s", err)
}

if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
if err := n.RepMgr.registerWitness(primary.NodeName); err != nil {
return fmt.Errorf("failed to register witness: %s", err)
}
} else {
Expand Down Expand Up @@ -527,3 +538,50 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro

return nil
}

// migrate node name from 6pn to machine ID if needed
func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error {
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
}

primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName)
if err != nil {
return fmt.Errorf("failed to establish connection to primary: %s", err)
}
defer func() { _ = primaryConn.Close(ctx) }()

rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
if err != nil {
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
}
defer rows.Close()

var applicationNames []string
for rows.Next() {
var applicationName string
if err := rows.Scan(&applicationName); err != nil {
return fmt.Errorf("failed to scan application_name: %s", err)
}
applicationNames = append(applicationNames, applicationName)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %s", err)
}

// if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
if slices.Contains(applicationNames, n.PrivateIP) {
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")

if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
return fmt.Errorf("failed to clone standby: %s", err)
}

if err := n.PGConfig.reload(ctx); err != nil {
return fmt.Errorf("failed to reload postgresql: %s", err)
}
}

return nil
}
8 changes: 8 additions & 0 deletions internal/flypg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,11 @@ func diskSizeInBytes(dir string) (uint64, error) {
}
return stat.Blocks * uint64(stat.Bsize), nil
}

func (*PGConfig) reload(ctx context.Context) error {
benwaffle marked this conversation as resolved.
Show resolved Hide resolved
_, err := utils.RunCmd(ctx, "postgres", "pg_ctl", "-D", "/data/postgresql/", "reload")
if err != nil {
return fmt.Errorf("failed to reload postgres: %s", err)
}
return nil
}
12 changes: 6 additions & 6 deletions internal/flypg/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,31 +70,31 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {

for _, member := range members {
if member.Role == PrimaryRoleName {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target)
endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.NodeName, n.AppName), target)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err)
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.NodeName, err)
continue
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode > 299 {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.Hostname, resp.StatusCode)
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %d\n", member.NodeName, resp.StatusCode)
}
}
}

for _, member := range members {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint)
endpoint := fmt.Sprintf("http://%s:5500/%s", fmt.Sprintf("%s.vm.%s.internal", member.NodeName, n.AppName), RestartHaproxyEndpoint)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err)
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.NodeName, err)
continue
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode > 299 {
log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.Hostname, resp.StatusCode)
log.Printf("[WARN] Failed to restart haproxy on member %s: %d\n", member.NodeName, resp.StatusCode)
}
}

Expand Down
Loading
Loading