Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Dump/Restore fails when masking tables with a generated column #184

Merged
merged 3 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/greenmask/cmd/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func init() {
"batch-size", "", 0,
"the number of rows to insert in a single batch during the COPY command (0 - all rows will be inserted in a single batch)",
)
Cmd.Flags().BoolP(
"overriding-system-value", "", false,
"use OVERRIDING SYSTEM VALUE clause for INSERTs",
)

// Connection options:
Cmd.Flags().StringP("host", "h", "/var/run/postgres", "database server host or socket directory")
Expand All @@ -189,7 +193,7 @@ func init() {
"disable-triggers", "enable-row-security", "if-exists", "no-comments", "no-data-for-failed-tables",
"no-security-labels", "no-subscriptions", "no-table-access-method", "no-tablespaces", "section",
"strict-names", "use-set-session-authorization", "inserts", "on-conflict-do-nothing", "restore-in-order",
"pgzip", "batch-size",
"pgzip", "batch-size", "overriding-system-value",

"host", "port", "username",
} {
Expand Down
1 change: 1 addition & 0 deletions docker/integration/filldb/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Dockerfile
2 changes: 1 addition & 1 deletion docker/integration/filldb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ENV TMP_DIR=/tmp/schema

RUN apt-get update && apt-get install -y wget && mkdir /tmp/schema

COPY filldb.sh /filldb.sh
COPY . /

RUN chmod +x ./filldb.sh

Expand Down
5 changes: 5 additions & 0 deletions docker/integration/filldb/filldb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@

cd $TMP_DIR
if [ ! -f $FILE_DUMP ]; then
echo "Downloading dump file"
wget https://edu.postgrespro.com/$FILE_DUMP
fi
IFS="," read -ra PG_VERSIONS_CHECK <<< "${PG_VERSIONS_CHECK}"
for pgver in ${PG_VERSIONS_CHECK[@]}; do
echo "Restoring database for PostgreSQL $pgver"
if psql -p 5432 -h db-$pgver -U postgres -c 'CREATE DATABASE demo;'; then
psql -p 5432 -h db-$pgver -U postgres -c 'DROP DATABASE demo_restore;'
psql -p 5432 -h db-$pgver -U postgres -c 'CREATE DATABASE demo_restore;'
gzip -dc $FILE_DUMP | psql -p 5432 -h db-$pgver -U postgres -d demo
if [ $pgver -ne '11' ]; then
psql -p 5432 -h db-$pgver -U postgres -d demo -f /generated.sql
fi
fi
done
9 changes: 9 additions & 0 deletions docker/integration/filldb/generated.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE public.people
(
id integer GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
generated text GENERATED ALWAYS AS (id || first_name) STORED,
first_name text
);

INSERT INTO public.people("first_name")
VALUES ('bob');
36 changes: 28 additions & 8 deletions docs/commands/restore.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Mostly it supports the same flags as the `pg_restore` utility, with some extra f
--no-table-access-method do not restore table access methods
--no-tablespaces do not restore tablespace assignments
--on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands
--overriding-system-value use OVERRIDING SYSTEM VALUE clause for INSERTs
--pgzip use pgzip decompression instead of gzip
-p, --port int database server port number (default 5432)
--restore-in-order restore tables in topological order, ensuring that dependent tables are not restored until the tables they depend on have been restored
Expand All @@ -70,28 +71,47 @@ Mostly it supports the same flags as the `pg_restore` utility, with some extra f

Insert commands are a lot slower than `COPY` commands. Use this feature only when necessary.

By default, Greenmask restores data using the `COPY` command. If you prefer to restore data using `INSERT` commands, you can
By default, Greenmask restores data using the `COPY` command. If you prefer to restore data using `INSERT` commands, you
can
use the `--inserts` flag. This flag allows you to manage errors that occur during the execution of INSERT commands. By
configuring an error and constraint [exclusion list in the config](../configuration.md#restoration-error-exclusion),
configuring an error and constraint [exclusion list in the config](../configuration.md#restoration-error-exclusion),
you can skip certain errors and continue inserting subsequent rows from the dump.

This can be useful when adding new records to an existing dump, but you don't want the process to stop if some records
already exist in the database or violate certain constraints.

By adding the `--on-conflict-do-nothing` flag, it generates `INSERT` statements with the ON `CONFLICT DO NOTHING`
clause, similar to the original pg_dump option. However, this approach only works for unique or exclusion constraints.
By adding the `--on-conflict-do-nothing` flag, it generates `INSERT` statements with the ON `CONFLICT DO NOTHING`
clause, similar to the original pg_dump option. However, this approach only works for unique or exclusion constraints.
If a foreign key is missing in the referenced table or any other constraint is violated, the insertion will still fail.
To handle these issues, you can define
an[exclusion list in the config](../configuration.md#restoration-error-exclusion).

```shell title="example with inserts and error handling"

```shell title="example with inserts and on conflict do nothing"
greenmask --config=config.yml restore DUMP_ID --inserts --on-conflict-do-nothing
```

By adding the `--overriding-system-value` flag, it generates `INSERT` statements with the `OVERRIDING SYSTEM VALUE`
clause, which allows you to insert data into identity columns.

```postgresql title="example of GENERATED ALWAYS AS IDENTITY column"
CREATE TABLE people (
id integer GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
generated text GENERATED ALWAYS AS (id || first_name) STORED,
first_name text
);
```

```shell title="example with inserts"
greenmask --config=config.yml restore DUMP_ID --inserts --overriding-system-value
```

### Restoration in topological order

By default, Greenmask restores tables in the order they are listed in the dump file. To restore tables in topological
order, use the `--restore-in-order` flag. This is particularly useful when your schema includes foreign key references and
order, use the `--restore-in-order` flag. This is particularly useful when your schema includes foreign key references
and
you need to insert data in the correct order. Without this flag, you may encounter errors when inserting data into
tables with foreign key constraints.

Expand All @@ -101,7 +121,6 @@ tables with foreign key constraints.
tables with cyclic dependencies is to temporarily remove the foreign key constraint (to break the cycle), restore the
data, and then re-add the foreign key constraint once the data restoration is complete.


If your database has cyclic dependencies you will be notified about it but the restoration will continue.

```text
Expand All @@ -123,8 +142,9 @@ greenmask --config=config.yml restore latest --pgzip

The COPY command returns the error only on transaction commit. This means that if you have a large dump and an error
occurs, you will have to wait until the end of the transaction to see the error message. To avoid this, you can use the
`--batch-size` flag to specify the number of rows to insert in a single batch during the COPY command. If an error occurs
during the batch insertion, the error message will be displayed immediately. The data will be committed **only if all
`--batch-size` flag to specify the number of rows to insert in a single batch during the COPY command. If an error
occurs
during the batch insertion, the error message will be displayed immediately. The data will be committed **only if all
batches are inserted successfully**.

!!! warning
Expand Down
6 changes: 5 additions & 1 deletion internal/db/postgres/cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/greenmaskio/greenmask/internal/db/postgres/transformers/utils"
"github.com/greenmaskio/greenmask/internal/domains"
"github.com/greenmaskio/greenmask/internal/storages"
"github.com/greenmaskio/greenmask/pkg/toolkit"
)

const MetadataJsonFileName = "metadata.json"
Expand All @@ -58,6 +59,7 @@ type Dump struct {
schemaToc *toc.Toc
resultToc *toc.Toc
dumpedObjectSizes map[int32]storageDto.ObjectSizeStat
tableOidToDumpId map[toolkit.Oid]int32
tocFileSize int64
version int
blobs *entries.Blobs
Expand All @@ -81,6 +83,7 @@ func NewDump(cfg *domains.Config, st storages.Storager, registry *utils.Transfor
tmpDir: path.Join(cfg.Common.TempDirectory, fmt.Sprintf("%d", time.Now().UnixNano())),
dumpedObjectSizes: map[int32]storageDto.ObjectSizeStat{},
registry: registry,
tableOidToDumpId: make(map[toolkit.Oid]int32),
}
}

Expand Down Expand Up @@ -300,6 +303,7 @@ func (d *Dump) createTocEntries() error {
}
switch v := obj.(type) {
case *entries.Table:
d.tableOidToDumpId[v.Oid] = entry.DumpId
d.dumpedObjectSizes[entry.DumpId] = storageDto.ObjectSizeStat{
Original: v.OriginalSize,
Compressed: v.CompressedSize,
Expand Down Expand Up @@ -414,7 +418,7 @@ func (d *Dump) writeMetaData(ctx context.Context, startedAt, completedAt time.Ti
cycles := d.context.Graph.GetCycledTables()
metadata, err := storageDto.NewMetadata(
d.resultToc, d.tocFileSize, startedAt, completedAt, d.config.Dump.Transformation, d.dumpedObjectSizes,
d.context.DatabaseSchema, d.dumpDependenciesGraph, d.sortedTablesDumpIds, cycles,
d.context.DatabaseSchema, d.dumpDependenciesGraph, d.sortedTablesDumpIds, cycles, d.tableOidToDumpId,
)
if err != nil {
return fmt.Errorf("unable build metadata: %w", err)
Expand Down
30 changes: 26 additions & 4 deletions internal/db/postgres/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ import (
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v3"

"github.com/greenmaskio/greenmask/internal/domains"

"github.com/greenmaskio/greenmask/internal/db/postgres/pgrestore"
"github.com/greenmaskio/greenmask/internal/db/postgres/restorers"
"github.com/greenmaskio/greenmask/internal/db/postgres/storage"
"github.com/greenmaskio/greenmask/internal/db/postgres/toc"
"github.com/greenmaskio/greenmask/internal/domains"
"github.com/greenmaskio/greenmask/internal/storages"
"github.com/greenmaskio/greenmask/pkg/toolkit"
)

const (
Expand Down Expand Up @@ -72,6 +72,10 @@ const metadataObjectName = "metadata.json"

const dependenciesCheckInterval = 15 * time.Millisecond

var (
ErrTableDefinitionIsEmtpy = errors.New("table definition is empty: please re-dump the data using the latest version of greenmask if you want to use --inserts")
)

type Restore struct {
binPath string
dsn string
Expand Down Expand Up @@ -641,9 +645,13 @@ func (r *Restore) taskPusher(ctx context.Context, tasks chan restorers.RestoreTa
switch *entry.Desc {
case toc.TableDataDesc:
if r.restoreOpt.Inserts || r.restoreOpt.OnConflictDoNothing {
t, err := r.getTableDefinitionFromMeta(entry.DumpId)
if err != nil {
return fmt.Errorf("cannot get table definition from meta: %w", err)
}
task = restorers.NewTableRestorerInsertFormat(
entry, r.st, r.restoreOpt.ExitOnError, r.restoreOpt.OnConflictDoNothing,
r.cfg.ErrorExclusions, r.restoreOpt.Pgzip,
entry, t, r.st, r.restoreOpt.ExitOnError, r.restoreOpt.OnConflictDoNothing,
r.cfg.ErrorExclusions, r.restoreOpt.Pgzip, r.restoreOpt.OverridingSystemValue,
)
} else {
task = restorers.NewTableRestorer(
Expand Down Expand Up @@ -671,6 +679,20 @@ func (r *Restore) taskPusher(ctx context.Context, tasks chan restorers.RestoreTa
}
}

func (r *Restore) getTableDefinitionFromMeta(dumpId int32) (*toolkit.Table, error) {
tableOid, ok := r.metadata.DumpIdsToTableOid[dumpId]
if !ok {
return nil, ErrTableDefinitionIsEmtpy
}
idx := slices.IndexFunc(r.metadata.DatabaseSchema, func(t *toolkit.Table) bool {
return t.Oid == tableOid
})
if idx == -1 {
panic(fmt.Sprintf("table with oid %d is not found in metadata", tableOid))
}
return r.metadata.DatabaseSchema[idx], nil
}

func (r *Restore) restoreWorker(ctx context.Context, tasks <-chan restorers.RestoreTask, id int) error {
// TODO: You should execute TX for each COPY stmt
conn, err := pgx.Connect(ctx, r.dsn)
Expand Down
2 changes: 1 addition & 1 deletion internal/db/postgres/context/pg_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func getTables(
// Columns were already initialized during the transformer initialization
continue
}
columns, err := getColumnsConfig(ctx, tx, t.Oid, version)
columns, err := getColumnsConfig(ctx, tx, t.Oid, version, true)
if err != nil {
return nil, nil, fmt.Errorf("unable to collect table columns: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/db/postgres/context/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func getDatabaseSchema(

// fill columns
for _, table := range res {
columns, err := getColumnsConfig(ctx, tx, table.Oid, version)
// We do not exclude generated columns here, because the schema must be compared with the original
columns, err := getColumnsConfig(ctx, tx, table.Oid, version, false)
if err != nil {
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions internal/db/postgres/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func validateAndBuildTablesConfig(
table.Constraints = constraints

// Assign columns and transformersMap if were found
columns, err := getColumnsConfig(ctx, tx, table.Oid, version)
columns, err := getColumnsConfig(ctx, tx, table.Oid, version, true)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func getTable(ctx context.Context, tx pgx.Tx, t *domains.Table) ([]*entries.Tabl
return tables, warnings, nil
}

func getColumnsConfig(ctx context.Context, tx pgx.Tx, oid toolkit.Oid, version int) ([]*toolkit.Column, error) {
func getColumnsConfig(ctx context.Context, tx pgx.Tx, oid toolkit.Oid, version int, excludeGenerated bool) ([]*toolkit.Column, error) {
defaultTypeMap := pgtype.NewMap()
var res []*toolkit.Column
buf := bytes.NewBuffer(nil)
Expand Down Expand Up @@ -260,6 +260,10 @@ func getColumnsConfig(ctx context.Context, tx pgx.Tx, oid toolkit.Oid, version i
if err != nil {
return nil, fmt.Errorf("cannot scan tableColumnQuery: %w", err)
}
// Skipping generated columns as they do not contain a real data
if excludeGenerated && column.IsGenerated {
continue
}
column.CanonicalTypeName = column.TypeName
// Getting canonical type name if exists. For instance - PostgreSQL type Integer is alias for int4
// (int4 - canonical type name)
Expand Down
2 changes: 2 additions & 0 deletions internal/db/postgres/entries/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ func (t *Table) Entry() (*toc.Entry, error) {
}

func (t *Table) GetCopyFromStatement() (string, error) {
// We could generate an explicit column list for the COPY statement, but it’s not necessary because, by default,
// generated columns are excluded from the COPY operation.
query := fmt.Sprintf("COPY \"%s\".\"%s\" TO STDOUT", t.Schema, t.Name)
if t.Query != "" {
query = fmt.Sprintf("COPY (%s) TO STDOUT", t.Query)
Expand Down
2 changes: 2 additions & 0 deletions internal/db/postgres/pgrestore/pgrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type Options struct {
OnConflictDoNothing bool `mapstructure:"on-conflict-do-nothing"`
Inserts bool `mapstructure:"inserts"`
RestoreInOrder bool `mapstructure:"restore-in-order"`
// OverridingSystemValue is a custom option that allows to use OVERRIDING SYSTEM VALUE for INSERTs
OverridingSystemValue bool `mapstructure:"overriding-system-value"`
// Use pgzip decompression instead of gzip
Pgzip bool `mapstructure:"pgzip"`
BatchSize int64 `mapstructure:"batch-size"`
Expand Down
Loading