Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cmd/backup/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ db:
port: 5432
database: postgres
user: postgres
log:
development: false
level: info
location: true

26 changes: 16 additions & 10 deletions cmd/backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package main
import (
"flag"
"fmt"
"log"
"os"
"runtime"

"github.com/ikitiki/logical_backup/pkg/config"
"github.com/ikitiki/logical_backup/pkg/logger"
"github.com/ikitiki/logical_backup/pkg/logicalbackup"
)

var (
configFile = flag.String("config", "config.yaml", "path to the config file")
version = flag.Bool("version", false, "Print version information")
configFile = flag.String("config", "config.yaml", "path to the config file")
version = flag.Bool("version", false, "print version information")
development = flag.Bool("log-development", false, "enable development logging mode")

Version string
Revision string
Expand All @@ -27,8 +28,8 @@ func buildInfo() string {

func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "%s\n", buildInfo())
fmt.Fprintf(os.Stderr, "\nUsage:\n")
_, _ = fmt.Fprintf(os.Stderr, "%s\n", buildInfo())
_, _ = fmt.Fprintf(os.Stderr, "\nUsage:\n")
flag.PrintDefaults()
}

Expand All @@ -39,25 +40,30 @@ func main() {
}

if _, err := os.Stat(*configFile); os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Config file %s does not exist", *configFile)
_, _ = fmt.Fprintf(os.Stderr, "Config file %s does not exist", *configFile)
os.Exit(1)
}

cfg, err := config.New(*configFile)
cfg, err := config.New(*configFile, *development)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not load config file: %v", err)
_, _ = fmt.Fprintf(os.Stderr, "Could not load config file: %v", err)
os.Exit(1)
}
// Initialize the logger after we've resolved the debug flag but before its first usage at cfg.Print
if err := logger.InitGlobalLogger(cfg.Log); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Could not initialize global logger")
os.Exit(1)
}

cfg.Print()

lb, err := logicalbackup.New(cfg)
if err != nil {
log.Fatalf("could not create backup instance: %v", err)
logger.G.Fatalf("could not create backup instance: %v", err)
}

if err := lb.Run(); err != nil {
fmt.Fprintln(os.Stderr, err)
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
44 changes: 38 additions & 6 deletions cmd/restore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package main

import (
"flag"
"log"
"fmt"
"os"

"github.com/jackc/pgx"

"github.com/ikitiki/logical_backup/pkg/logger"
"github.com/ikitiki/logical_backup/pkg/logicalrestore"
"github.com/ikitiki/logical_backup/pkg/message"
)
Expand All @@ -15,6 +16,9 @@ var (
pgUser, pgPass, pgHost, pgDbname *string
targetTable, backupDir, tableName, schemaName *string
pgPort *uint
logDevelopment *bool
logLevel *string
logShowLocation *bool
)

func init() {
Expand All @@ -29,6 +33,9 @@ func init() {
schemaName = flag.String("schema", "public", "Schema name")
targetTable = flag.String("target-table", "", "Target table name (optional)")
backupDir = flag.String("backup-dir", "", "Backups dir")
logDevelopment = flag.Bool("log-development", false, "Enable development logging mode")
logLevel = flag.String("log-level", "", "Set log level")
logShowLocation = flag.Bool("log-location", true, "Show log location")

flag.Parse()

Expand All @@ -38,9 +45,32 @@ func init() {
}
}

func makeLoggerConfig() *logger.LoggerConfig {
lc := logger.DefaultLogConfig()
lc.Development = *logDevelopment
lc.Location = logShowLocation

if *logLevel != "" {
if err := logger.ValidateLogLevel(*logLevel); err != nil {
_, _ = fmt.Fprint(os.Stderr, err)
}
lc.Level = *logLevel
}
return lc
}

func main() {

tbl := message.NamespacedName{Namespace: *schemaName, Name: *tableName}

lc := makeLoggerConfig()
if err := logger.InitGlobalLogger(lc, "table to restore", tbl.String()); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Could not initialize global logger")
os.Exit(1)
}

if *targetTable != "" {
log.Printf("restoring into %v", targetTable)
logger.G.Infof("restoring into %v", targetTable)
}

config := pgx.ConnConfig{
Expand All @@ -54,14 +84,16 @@ func main() {
// honor PGHOST, PGPORT and other libpq variables when set.
envConfig, err := pgx.ParseEnvLibpq()
if err != nil {
log.Fatalf("could not parse libpq environment variables: %v", err)
logger.G.WithError(err).Fatal("could not parse libpq environment variables: %v")
}
config = config.Merge(envConfig)

tbl := message.NamespacedName{Namespace: *schemaName, Name: *tableName}
r := logicalrestore.New(tbl, *backupDir, config)
r, err := logicalrestore.New(tbl, *backupDir, config, lc)
if err != nil {
logger.G.WithError(err).Fatal("could not initialize restore structure")
}

if err := r.Restore(); err != nil {
log.Fatalf("could not restore table: %v", err)
logger.G.WithError(err).Fatal("could not restore table")
}
}
86 changes: 56 additions & 30 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,41 @@ package config

import (
"fmt"
"log"
"os"
"strconv"
"strings"
"time"

"github.com/jackc/pgx"
"gopkg.in/yaml.v2"

"github.com/ikitiki/logical_backup/pkg/logger"
)

const (
defaultPrometheusPort = 1999
)

type Config struct {
DB pgx.ConnConfig `yaml:"db"`
SlotName string `yaml:"slotname"`
PublicationName string `yaml:"publication"`
TrackNewTables bool `yaml:"trackNewTables"`
DeltasPerFile int `yaml:"deltasPerFile"`
BackupThreshold int `yaml:"backupThreshold"`
ConcurrentBasebackups int `yaml:"concurrentBasebackups"`
InitialBasebackup bool `yaml:"initialBasebackup"`
Fsync bool `yaml:"fsync"`
StagingDir string `yaml:"StagingDir"`
ArchiveDir string `yaml:"archiveDir"`
ForceBasebackupAfterInactivityInterval time.Duration `yaml:"forceBasebackupAfterInactivityInterval"`
ArchiverTimeout time.Duration `yaml:"archiverTimeout"`
PrometheusPort int `yaml:"prometheusPort"`
DB pgx.ConnConfig `yaml:"db"`
SlotName string `yaml:"slotname"`
PublicationName string `yaml:"publication"`
TrackNewTables bool `yaml:"trackNewTables"`
DeltasPerFile int `yaml:"deltasPerFile"`
BackupThreshold int `yaml:"backupThreshold"`
ConcurrentBasebackups int `yaml:"concurrentBasebackups"`
InitialBasebackup bool `yaml:"initialBasebackup"`
Fsync bool `yaml:"fsync"`
StagingDir string `yaml:"StagingDir"`
ArchiveDir string `yaml:"archiveDir"`
ForceBasebackupAfterInactivityInterval time.Duration `yaml:"forceBasebackupAfterInactivityInterval"`
ArchiverTimeout time.Duration `yaml:"archiverTimeout"`
PrometheusPort int `yaml:"prometheusPort"`
Log *logger.LoggerConfig `yaml:"log"`
}

func New(filename string) (*Config, error) {
// New constructs a new Config instance.
func New(filename string, developmentMode bool) (*Config, error) {
var cfg Config

fp, err := os.Open(filename)
Expand All @@ -40,7 +45,7 @@ func New(filename string) (*Config, error) {
}
defer func() {
if err := fp.Close(); err != nil {
log.Printf("could not close file: %v", err)
fmt.Printf("could not close file: %v", err)
}
}()

Expand All @@ -62,25 +67,46 @@ func New(filename string) (*Config, error) {
cfg.PrometheusPort = defaultPrometheusPort
}

defaultLoggerConfig := logger.DefaultLogConfig()
if cfg.Log == nil {
cfg.Log = defaultLoggerConfig
}
if cfg.Log.Location == nil {
cfg.Log.Location = defaultLoggerConfig.Location
}

if developmentMode {
cfg.Log.Development = developmentMode
}

if err := logger.ValidateLogLevel(cfg.Log.Level); err != nil {
return nil, err
}

return &cfg, nil
}

func (c Config) Print() {
if c.StagingDir != "" {
log.Printf("Staging directory: %q", c.StagingDir)
pr := logger.PrintOption

if c.StagingDir == "" {
logger.G.Info("No staging directory specified. Files will be written directly to the archive directory")
} else {
log.Printf("No staging directory. Writing directly to the archive dir")
pr("Staging directory", c.StagingDir)
}

log.Printf("Archive directory: %q", c.ArchiveDir)
log.Printf("BackupThreshold: %v", c.BackupThreshold)
log.Printf("DeltasPerFile: %v", c.DeltasPerFile)
log.Printf("DB connection string: %s@%s:%d/%s slot:%q publication:%q",
c.DB.User, c.DB.Host, c.DB.Port, c.DB.Database, c.SlotName, c.PublicationName)
log.Printf("Backing up new tables: %t", c.TrackNewTables)
log.Printf("Fsync: %t", c.Fsync)
pr("Archive directory", c.ArchiveDir)
pr("BackupThreshold", strconv.Itoa(c.BackupThreshold))
pr("DeltasPerFile", strconv.Itoa(c.DeltasPerFile))
pr("DB Connection String",
"%s@%s:%d/%s slot:%q publication:%q", c.DB.User, c.DB.Host, c.DB.Port, c.DB.Database, c.SlotName, c.PublicationName)
pr("Track New Tables", strconv.FormatBool(c.TrackNewTables))
pr("Fsync", strconv.FormatBool(c.Fsync))
if c.ForceBasebackupAfterInactivityInterval > 0 {
log.Printf("Force new basebackup of a modified table after inactivity for: %v",
c.ForceBasebackupAfterInactivityInterval)
pr("Force new backups of a modified table after inactivity", c.ForceBasebackupAfterInactivityInterval.String())
}
pr("Log development mode", strconv.FormatBool(c.Log.Development))
if c.Log.Level != "" {
pr("Log level", strings.ToUpper(c.Log.Level))
}
pr("Log includes file location", strconv.FormatBool(*c.Log.Location))
}
2 changes: 0 additions & 2 deletions pkg/dbutils/dbutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"database/sql"
"database/sql/driver"
"fmt"
"log"

"github.com/jackc/pgx"
"github.com/jackc/pgx/pgtype"
Expand Down Expand Up @@ -107,7 +106,6 @@ func CreateMissingPublication(conn *pgx.Conn, publicationName string) error {
return fmt.Errorf("could not create publication: %v", err)
}
rows.Close()
log.Printf("created missing publication: %q", query)

return nil
}
Expand Down
Loading