Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address Collation Mismatches #230

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions internal/flypg/collations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package flypg

import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"fmt"
"log"
"os"

"github.com/fly-apps/postgres-flex/internal/utils"
"github.com/jackc/pgx/v5"
)

const collationVersionFile = "/data/.collationVersion"

func calculateLocaleVersionHash() (string, error) {
output, err := utils.RunCommand("locale --version", "postgres")
if err != nil {
return "", fmt.Errorf("failed to read locale version: %w", err)
}

hash := sha256.Sum256(output)
return hex.EncodeToString(hash[:]), nil
}

func writeCollationVersionFile(versionHash string) error {
return os.WriteFile(collationVersionFile, []byte(versionHash), 0600)
}

func collationHashChanged(versionHash string) (bool, error) {
// Short-circuit if there's no collation file.
_, err := os.Stat(collationVersionFile)
switch {
case os.IsNotExist(err):
return true, nil
case err != nil:
return false, fmt.Errorf("failed to stat collation lock file: %w", err)
}

// Read the collation version file.
oldVersionHash, err := os.ReadFile(collationVersionFile)
if err != nil {
return false, fmt.Errorf("failed to read collation lock file: %w", err)
}

// Compare the version hashes.
return versionHash != string(oldVersionHash), nil
}

const identifyImpactedCollationObjectsSQL = `
SELECT pg_describe_object(refclassid, refobjid, refobjsubid) AS "Collation",
pg_describe_object(classid, objid, objsubid) AS "Object"
FROM pg_depend d JOIN pg_collation c
ON refclassid = 'pg_collation'::regclass AND refobjid = c.oid
WHERE c.collversion <> pg_collation_actual_version(c.oid)
ORDER BY 1, 2;
`

type collationObject struct {
collation string
object string
}

func impactedCollationObjects(ctx context.Context, conn *pgx.Conn) ([]collationObject, error) {
rows, err := conn.Query(ctx, identifyImpactedCollationObjectsSQL)
if err != nil {
return nil, fmt.Errorf("failed to query impacted objects: %v", err)
}
defer rows.Close()

var objects []collationObject

var collation, object string
for rows.Next() {
if err := rows.Scan(&collation, &object); err != nil {
return nil, fmt.Errorf("failed to scan row: %v", err)
}
objects = append(objects, collationObject{collation: collation, object: object})
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate over rows: %v", err)
}

return objects, nil
}

func refreshCollations(ctx context.Context, dbConn *pgx.Conn, dbName string) error {
if dbName != "template1" {
if err := refreshDatabaseCollations(ctx, dbConn, dbName); err != nil {
return err
}
}

return refreshDatabase(ctx, dbConn, dbName)
}

func refreshDatabaseCollations(ctx context.Context, dbConn *pgx.Conn, dbName string) error {
collations, err := fetchCollations(ctx, dbConn)
if err != nil {
return fmt.Errorf("failed to fetch collations: %w", err)
}

for _, collation := range collations {
if err := refreshCollation(ctx, dbConn, collation); err != nil {
log.Printf("[WARN] failed to refresh collation version in db %s: %v\n", dbName, err)
}
}

return nil
}

func refreshCollation(ctx context.Context, dbConn *pgx.Conn, collation string) error {
query := fmt.Sprintf("ALTER COLLATION pg_catalog.\"%s\" REFRESH VERSION;", collation)
_, err := dbConn.Exec(ctx, query)
return err
}

func refreshDatabase(ctx context.Context, dbConn *pgx.Conn, dbName string) error {
query := fmt.Sprintf("ALTER DATABASE %s REFRESH COLLATION VERSION;", dbName)
_, err := dbConn.Exec(ctx, query)
if err != nil {
return fmt.Errorf("failed to refresh database collation version: %w", err)
}
return nil
}

func fetchCollations(ctx context.Context, dbConn *pgx.Conn) ([]string, error) {
query := "SELECT DISTINCT datcollate FROM pg_database WHERE datcollate != 'C'"
rows, err := dbConn.Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to fetch collations: %w", err)
}
defer rows.Close()

var collations []string
for rows.Next() {
var collation sql.NullString
if err := rows.Scan(&collation); err != nil {
return nil, fmt.Errorf("failed to scan collation row: %w", err)
}
if collation.Valid {
collations = append(collations, collation.String)
}
}

if rows.Err() != nil {
return nil, rows.Err()
}

return collations, nil
}
97 changes: 87 additions & 10 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to unset read-only: %s", err)
}
}

// This is a safety check to ensure collation integrity is maintained.
if err := n.evaluateCollationIntegrity(ctx, conn); err != nil {
log.Printf("[WARN] Problem occurred while evaluating collation integrity: %s", err)
}

case StandbyRoleName:
// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
Expand Down Expand Up @@ -381,18 +387,18 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to issue registration certificate: %s", err)
}
} else {
if n.RepMgr.Witness {
log.Println("Registering witness")
// Create required users
if err := n.setupCredentials(ctx, conn); err != nil {
return fmt.Errorf("failed to create required users: %s", err)
}

// Create required users
if err := n.setupCredentials(ctx, conn); err != nil {
return fmt.Errorf("failed to create required users: %s", err)
}
// Setup repmgr database and extension
if err := n.RepMgr.enable(ctx, conn); err != nil {
return fmt.Errorf("failed to enable repmgr: %s", err)
}

// Setup repmgr database and extension
if err := n.RepMgr.enable(ctx, conn); err != nil {
return fmt.Errorf("failed to enable repmgr: %s", err)
}
if n.RepMgr.Witness {
log.Println("Registering witness")

primary, err := n.RepMgr.ResolveMemberOverDNS(ctx)
if err != nil {
Expand Down Expand Up @@ -471,3 +477,74 @@ func setDirOwnership() error {

return nil
}

func (n *Node) evaluateCollationIntegrity(ctx context.Context, conn *pgx.Conn) error {
// Calculate the current collation version hash.
versionHash, err := calculateLocaleVersionHash()
if err != nil {
return fmt.Errorf("failed to calculate collation sum: %w", err)
}

// Check to see if the collation version has changed.
changed, err := collationHashChanged(versionHash)
if err != nil {
return fmt.Errorf("failed to check collation version file: %s", err)
}

if !changed {
return nil
}

fmt.Printf("[INFO] Evaluating collation integrity.\n")

dbs, err := admin.ListDatabases(ctx, conn)
if err != nil {
return fmt.Errorf("failed to list databases: %s", err)
}

dbs = append(dbs, admin.DbInfo{Name: "template1"})

collationIssues := 0

for _, db := range dbs {
// Establish a connection to the database.
dbConn, err := n.NewLocalConnection(ctx, db.Name, n.SUCredentials)
if err != nil {
return fmt.Errorf("failed to establish connection to database %s: %s", db.Name, err)
}
defer func() { _ = dbConn.Close(ctx) }()

if err := refreshCollations(ctx, dbConn, db.Name); err != nil {
return fmt.Errorf("failed to refresh collations for db %s: %s", db.Name, err)
}

// TODO - Consider logging a link to documentation on how to resolve collation issues not resolved by the refresh process.

// The collation refresh process should resolve "most" issues, but there are cases that may require
// re-indexing or other manual intervention. In the event any objects are found we will log a warning.
colObjects, err := impactedCollationObjects(ctx, dbConn)
if err != nil {
return fmt.Errorf("failed to fetch impacted collation objects: %s", err)
}

for _, obj := range colObjects {
log.Printf("[WARN] Collation mismatch detected - Database %s, Collation: %s, Object: %s\n", db.Name, obj.collation, obj.object)
collationIssues++
}
}

// Don't set the version file if there are collation issues.
// This will force the system to re-evaluate the collation integrity on the next boot and ensure
// issues continue to be logged.
if collationIssues > 0 {
return nil
}

// No collation issues found, we can safely update the version file.
// This will prevent the system from re-evaluating the collation integrity on every boot.
if err := writeCollationVersionFile(versionHash); err != nil {
return fmt.Errorf("failed to write collation version file: %s", err)
}

return nil
}
28 changes: 18 additions & 10 deletions internal/utils/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,45 @@ import (
"fmt"
"io"
"log"

"os"
"os/exec"
"os/user"
"strconv"
"syscall"
)

// TODO - RunCommand should take a context.

func RunCommand(cmdStr, usr string) ([]byte, error) {
uid, gid, err := SystemUserIDs(usr)
if err != nil {
return nil, err
}

log.Printf("> Running command as %s: %s\n", usr, cmdStr)

cmd := exec.Command("sh", "-c", cmdStr)
cmd.SysProcAttr = &syscall.SysProcAttr{}
cmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)}

var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf)
cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)
debug := os.Getenv("DEBUG")
if debug != "" {
log.Printf("> Running command as %s: %s\n", usr, cmdStr)

err = cmd.Run()
if err != nil {
if ee, ok := err.(*exec.ExitError); ok {
ee.Stderr = stderrBuf.Bytes()
var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf)
cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)

err = cmd.Run()
if err != nil {
if ee, ok := err.(*exec.ExitError); ok {
ee.Stderr = stderrBuf.Bytes()
}
}

return stdoutBuf.Bytes(), err
}

return stdoutBuf.Bytes(), err
return cmd.Output()
}

func SetFileOwnership(pathToFile, owner string) error {
Expand Down
Loading