Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support/db: PoC: Use COPY instead of INSERT in BatchInsertBuilder #4094

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func (i *effectBatchInsertBuilder) Add(
"history_account_id": accountID,
"address_muxed": muxedAccount,
"history_operation_id": operationID,
"\"order\"": order,
"order": order,
"type": effectType,
"details": details,
// we need to convert to string in order to make the lib/pq's COPY escaping happy
"details": string(details),
})
}

Expand Down
6 changes: 5 additions & 1 deletion services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type LiquidityPool struct {
type LiquidityPoolAssetReserves []LiquidityPoolAssetReserve

func (c LiquidityPoolAssetReserves) Value() (driver.Value, error) {
return json.Marshal(c)
b, err := json.Marshal(c)
if err != nil {
return nil, err
}
return string(b), err
}

func (c *LiquidityPoolAssetReserves) Scan(value interface{}) error {
Expand Down
12 changes: 10 additions & 2 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,11 @@ type ExpAssetStatAccounts struct {
}

func (e ExpAssetStatAccounts) Value() (driver.Value, error) {
return json.Marshal(e)
b, err := json.Marshal(e)
if err != nil {
return nil, err
}
return string(b), nil
}

func (e *ExpAssetStatAccounts) Scan(src interface{}) error {
Expand Down Expand Up @@ -403,7 +407,11 @@ type ExpAssetStatBalances struct {
}

func (e ExpAssetStatBalances) Value() (driver.Value, error) {
return json.Marshal(e)
b, err := json.Marshal(e)
if err != nil {
return nil, err
}
return string(b), nil
}

func (e *ExpAssetStatBalances) Scan(src interface{}) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ func (i *operationBatchInsertBuilder) Add(
sourceAccountMuxed null.String,
) error {
return i.builder.Row(ctx, map[string]interface{}{
"id": id,
"transaction_id": transactionID,
"application_order": applicationOrder,
"type": operationType,
"details": details,
"id": id,
"transaction_id": transactionID,
"application_order": applicationOrder,
"type": operationType,
// we need to convert to string in order to make the lib/pq's COPY escaping happy
"details": string(details),
"source_account": sourceAccount,
"source_account_muxed": sourceAccountMuxed,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// rows into the history_trades table
type InsertTrade struct {
HistoryOperationID int64 `db:"history_operation_id"`
Order int32 `db:"\"order\""`
Order int32 `db:"order"`
LedgerCloseTime time.Time `db:"ledger_closed_at"`

CounterAssetID int64 `db:"counter_asset_id"`
Expand Down
4 changes: 1 addition & 3 deletions services/horizon/internal/db2/history/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,7 @@ func TestInsertTransactionDoesNotAllowDuplicateIndex(t *testing.T) {
tt.Assert.NoError(insertBuilder.Add(tt.Ctx, secondTransaction, sequence))
tt.Assert.EqualError(
insertBuilder.Exec(tt.Ctx),
"error adding values while inserting to history_transactions: "+
"exec failed: pq: duplicate key value violates unique constraint "+
"\"hs_transaction_by_id\"",
"pq: duplicate key value violates unique constraint \"hs_transaction_by_id\"",
)

ledger := Ledger{
Expand Down
141 changes: 89 additions & 52 deletions support/db/batch_insert_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"reflect"
"sort"

sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/stellar/go/support/errors"
)

Expand All @@ -15,15 +16,11 @@ import (
// It is NOT safe for concurrent use.
type BatchInsertBuilder struct {
Table *Table
// MaxBatchSize defines the maximum size of a batch. If this number is
// reached after calling Row() it will call Exec() immediately inserting
// all rows to a DB.
// Zero (default) will not add rows until explicitly calling Exec.
// TODO: now unused
MaxBatchSize int

// Suffix adds a sql expression to the end of the query (e.g. an ON CONFLICT clause)
Suffix string

Suffix string
columns []string
rows [][]interface{}
rowStructType reflect.Type
Expand All @@ -36,7 +33,6 @@ type BatchInsertBuilder struct {
func (b *BatchInsertBuilder) Row(ctx context.Context, row map[string]interface{}) error {
if b.columns == nil {
b.columns = make([]string, 0, len(row))
b.rows = make([][]interface{}, 0)

for column := range row {
b.columns = append(b.columns, column)
Expand All @@ -60,18 +56,12 @@ func (b *BatchInsertBuilder) Row(ctx context.Context, row map[string]interface{}

b.rows = append(b.rows, rowSlice)

// Call Exec when MaxBatchSize is reached.
if len(b.rows) == b.MaxBatchSize {
return b.Exec(ctx)
}

return nil
}

func (b *BatchInsertBuilder) RowStruct(ctx context.Context, row interface{}) error {
if b.columns == nil {
b.columns = ColumnsForStruct(row)
b.rows = make([][]interface{}, 0)
}

rowType := reflect.TypeOf(row)
Expand All @@ -89,54 +79,101 @@ func (b *BatchInsertBuilder) RowStruct(ctx context.Context, row interface{}) err
for i, rval := range rvals {
columnValues[i] = rval.Interface()
}

b.rows = append(b.rows, columnValues)

// Call Exec when MaxBatchSize is reached.
if len(b.rows) == b.MaxBatchSize {
return b.Exec(ctx)
}

return nil
}

func (b *BatchInsertBuilder) insertSQL() sq.InsertBuilder {
insertStatement := sq.Insert(b.Table.Name).Columns(b.columns...)
if len(b.Suffix) > 0 {
return insertStatement.Suffix(b.Suffix)
}
return insertStatement
}

// Exec inserts rows in batches. In case of errors it's possible that some batches
// were added so this should be run in a DB transaction for easy rollbacks.
func (b *BatchInsertBuilder) Exec(ctx context.Context) error {
sql := b.insertSQL()
paramsCount := 0

for _, row := range b.rows {
sql = sql.Values(row...)
paramsCount += len(row)

if paramsCount > postgresQueryMaxParams-2*len(b.columns) {
_, err := b.Table.Session.Exec(ctx, sql)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error adding values while inserting to %s", b.Table.Name))
}
paramsCount = 0
sql = b.insertSQL()
func (b *BatchInsertBuilder) Exec(ctx context.Context) (err error) {
if len(b.rows) == 0 {
// Nothing to do
return nil
}
var (
bookKeepTx bool
stmt *sqlx.Stmt
)

// cleanup
defer func() {
if stmt != nil {
stmt.Close()
}
if bookKeepTx && b.Table.Session.GetTx() != nil {
b.Table.Session.Rollback()
}
}()

// Begin a transaction if it wasn't started externally
if b.Table.Session.GetTx() == nil {
if err = b.Table.Session.Begin(); err != nil {
return
}
bookKeepTx = true
}

// Insert last batch
if paramsCount > 0 {
_, err := b.Table.Session.Exec(ctx, sql)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error adding values while inserting to %s", b.Table.Name))
// Ensure there is temporary table were to COPY the content
// and later merge into the final table (needed to support the insert suffix)
_, err = b.Table.Session.GetTx().ExecContext(
ctx,
fmt.Sprintf("CREATE TEMP TABLE IF NOT EXISTS tmp_%s (LIKE %s INCLUDING DEFAULTS) ON COMMIT DROP", b.Table.Name, b.Table.Name),
)
if err != nil {
return
}

// Start COPY
stmt, err = b.Table.Session.GetTx().PreparexContext(ctx, pq.CopyIn("tmp_"+b.Table.Name, b.columns...))
if err != nil {
return
}

// COPY values into temporary table
for _, r := range b.rows {
if _, err = stmt.ExecContext(ctx, r...); err != nil {
return
}

}
if _, err = stmt.ExecContext(ctx); err != nil {
// wrap up statement execution
return
}

// Clear the rows so user can reuse it for batch inserting to a single table
b.rows = make([][]interface{}, 0)
return nil
err = stmt.Close()
// mark statement as closed
stmt = nil
if err != nil {
return
}

// Merge temporary table with final table, using insertion Suffix
_, err = b.Table.Session.GetTx().ExecContext(
ctx,
fmt.Sprintf("INSERT INTO %s SELECT * FROM tmp_%s %s", b.Table.Name, b.Table.Name, b.Suffix),
)
if err != nil {
return
}

// Truncate temporary table
// TODO: we could avoid this if we have guarantees of Exec() only being called once
// per transaction
_, err = b.Table.Session.GetTx().ExecContext(
ctx,
fmt.Sprintf("TRUNCATE TABLE tmp_%s", b.Table.Name),
)
if err != nil {
return
}

if bookKeepTx {
err = b.Table.Session.Commit()
}
if err == nil {
// Clear the rows so user can reuse it for batch inserting to a single table
b.rows = make([][]interface{}, 0)
}
return
}
3 changes: 1 addition & 2 deletions support/db/batch_insert_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ func TestBatchInsertBuilder(t *testing.T) {

err = insertBuilder.Exec(ctx)
assert.EqualError(
t, err, "error adding values while inserting to people: exec failed: pq:"+
" duplicate key value violates unique constraint \"people_pkey\"",
t, err, "pq: duplicate key value violates unique constraint \"people_pkey\"",
)

insertBuilder.Suffix = "ON CONFLICT (name) DO NOTHING"
Expand Down