Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support/db: Remove ctx param from Begin/BeginTx/Rollback #3630

Merged
merged 9 commits into from
May 25, 2021
5 changes: 5 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -12,6 +12,11 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
### DB State Migration

* This release comes with a small DB schema change (new multiplexed-account-related columns are incororated). It should not take more than five minutes to run due to new columns being NULL-able.

### Deprecations

* Deprecate `--captive-core-config-append-path` in favor of `--captive-core-config-path`. The difference between the two flags is that `--captive-core-config-path` will validate the configuration file to reject any fields which are not supported by captive core ([3629](https://github.com/stellar/go/pull/3629)).

### Deprecations

* Deprecate `--captive-core-config-append-path` in favor of `--captive-core-config-path`. The difference between the two flags is that `--captive-core-config-path` will validate the configuration file to reject any fields which are not supported by captive core ([3629](https://github.com/stellar/go/pull/3629)).
4 changes: 2 additions & 2 deletions services/horizon/internal/actions/orderbook_test.go
Original file line number Diff line number Diff line change
@@ -582,11 +582,11 @@ func TestOrderbookGetResource(t *testing.T) {
}
assert.NoError(t, batch.Exec(tt.Ctx))

assert.NoError(t, q.BeginTx(tt.Ctx, &sql.TxOptions{
assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}))
defer q.Rollback(tt.Ctx)
defer q.Rollback()

fullResponse := empty
fullResponse.Asks = []protocol.PriceLevel{
4 changes: 2 additions & 2 deletions services/horizon/internal/actions_path_test.go
Original file line number Diff line number Diff line change
@@ -50,11 +50,11 @@ func mockPathFindingClient(
router.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s := session.Clone()
s.BeginTx(tt.Ctx, &sql.TxOptions{
s.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
defer s.Rollback(tt.Ctx)
defer s.Rollback()

ctx := context.WithValue(
r.Context(),
8 changes: 4 additions & 4 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
@@ -241,11 +241,11 @@ type IngestionQ interface {
QTransactions
QTrustLines

Begin(context.Context) error
BeginTx(context.Context, *sql.TxOptions) error
Commit(context.Context) error
Begin() error
BeginTx(*sql.TxOptions) error
Commit() error
CloneIngestionQ() IngestionQ
Rollback(context.Context) error
Rollback() error
GetTx() *sqlx.Tx
GetIngestVersion(context.Context) (int, error)
UpdateExpStateInvalid(context.Context, bool) error
20 changes: 10 additions & 10 deletions services/horizon/internal/db2/history/orderbook_test.go
Original file line number Diff line number Diff line change
@@ -18,8 +18,8 @@ func TestGetOrderBookSummaryRequiresTransaction(t *testing.T) {
_, err := q.GetOrderBookSummary(tt.Ctx, nativeAsset, eurAsset, 10)
assert.EqualError(t, err, "cannot be called outside of a transaction")

assert.NoError(t, q.Begin(tt.Ctx))
defer q.Rollback(tt.Ctx)
assert.NoError(t, q.Begin())
defer q.Rollback()

_, err = q.GetOrderBookSummary(tt.Ctx, nativeAsset, eurAsset, 10)
assert.EqualError(t, err, "should only be called in a repeatable read transaction")
@@ -220,11 +220,11 @@ func TestGetOrderBookSummary(t *testing.T) {
}
assert.NoError(t, batch.Exec(tt.Ctx))

assert.NoError(t, q.BeginTx(tt.Ctx, &sql.TxOptions{
assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}))
defer q.Rollback(tt.Ctx)
defer q.Rollback()

result, err := q.GetOrderBookSummary(tt.Ctx, nativeAsset, eurAsset, testCase.limit)
assert.NoError(t, err)
@@ -266,7 +266,7 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) {
}
assert.NoError(t, batch.Exec(tt.Ctx))

assert.NoError(t, q.BeginTx(tt.Ctx, &sql.TxOptions{
assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}))
@@ -276,7 +276,7 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) {
assert.Len(t, result.Asks, 2)
assert.Len(t, result.Bids, 1)

assert.NoError(t, q.Rollback(tt.Ctx))
assert.NoError(t, q.Rollback())

for i, offer := range offers {
var count int64
@@ -285,7 +285,7 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) {
assert.Equal(t, int64(1), count)
}

assert.NoError(t, q.BeginTx(tt.Ctx, &sql.TxOptions{
assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}))
@@ -295,13 +295,13 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) {
assert.Len(t, result.Asks, 0)
assert.Len(t, result.Bids, 0)

assert.NoError(t, q.Rollback(tt.Ctx))
assert.NoError(t, q.Rollback())

count, err := q.CompactOffers(tt.Ctx, 1000)
assert.NoError(t, err)
assert.Equal(t, int64(len(offers)), count)

assert.NoError(t, q.BeginTx(tt.Ctx, &sql.TxOptions{
assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}))
@@ -311,5 +311,5 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) {
assert.Len(t, result.Asks, 0)
assert.Len(t, result.Bids, 0)

assert.NoError(t, q.Rollback(tt.Ctx))
assert.NoError(t, q.Rollback())
}
8 changes: 4 additions & 4 deletions services/horizon/internal/db2/history/trust_lines_test.go
Original file line number Diff line number Diff line change
@@ -404,8 +404,8 @@ func TestAssetsForAddressRequiresTransaction(t *testing.T) {
_, _, err := q.AssetsForAddress(tt.Ctx, eurTrustLine.Data.TrustLine.AccountId.Address())
assert.EqualError(t, err, "cannot be called outside of a transaction")

assert.NoError(t, q.Begin(tt.Ctx))
defer q.Rollback(tt.Ctx)
assert.NoError(t, q.Begin())
defer q.Rollback()

_, _, err = q.AssetsForAddress(tt.Ctx, eurTrustLine.Data.TrustLine.AccountId.Address())
assert.EqualError(t, err, "should only be called in a repeatable read transaction")
@@ -451,12 +451,12 @@ func TestAssetsForAddress(t *testing.T) {
_, err = q.InsertTrustLine(tt.Ctx, brlTrustLine)
tt.Assert.NoError(err)

err = q.BeginTx(tt.Ctx, &sql.TxOptions{
err = q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
assert.NoError(t, err)
defer q.Rollback(tt.Ctx)
defer q.Rollback()

assets, balances, err := q.AssetsForAddress(tt.Ctx, usdTrustLine.Data.TrustLine.AccountId.Address())
tt.Assert.NoError(err)
4 changes: 2 additions & 2 deletions services/horizon/internal/httpx/handler.go
Original file line number Diff line number Diff line change
@@ -104,14 +104,14 @@ func repeatableReadStream(

return func() ([]sse.Event, error) {
if session != nil {
err := session.BeginTx(r.Context(), &sql.TxOptions{
err := session.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
if err != nil {
return nil, errors.Wrap(err, "Error starting repeatable read transaction")
}
defer session.Rollback(r.Context())
defer session.Rollback()
}

return generateEvents()
6 changes: 3 additions & 3 deletions services/horizon/internal/httpx/middleware.go
Original file line number Diff line number Diff line change
@@ -298,7 +298,7 @@ func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
// Otherwise, because the ingestion system is running concurrently with this request,
// it is possible to have one read fetch data from ledger N and another read
// fetch data from ledger N+1 .
err := session.BeginTx(ctx, &sql.TxOptions{
err := session.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
@@ -307,7 +307,7 @@ func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
problem.Render(ctx, w, err)
return
}
defer session.Rollback(ctx)
defer session.Rollback()

if !m.NoStateVerification {
stateInvalid, invalidErr := q.GetExpStateInvalid(ctx)
@@ -336,7 +336,7 @@ func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
// otherwise, the stream will not pick up updates occurring in future
// ledgers
if sseRequest {
if err = session.Rollback(ctx); err != nil {
if err = session.Rollback(); err != nil {
problem.Render(
ctx,
w,
17 changes: 8 additions & 9 deletions services/horizon/internal/httpx/stream_handler_test.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ import (
"testing"

"github.com/go-chi/chi"
"github.com/stretchr/testify/mock"

"github.com/stellar/go/services/horizon/internal/actions"
horizonContext "github.com/stellar/go/services/horizon/internal/context"
@@ -478,17 +477,17 @@ func TestRepeatableReadStream(t *testing.T) {
}

session := &db.MockSession{}
session.On("BeginTx", mock.Anything, &sql.TxOptions{
session.On("BeginTx", &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}).Return(nil).Once()
session.On("Rollback", mock.Anything).Return(nil).Once()
session.On("Rollback").Return(nil).Once()

session.On("BeginTx", mock.Anything, &sql.TxOptions{
session.On("BeginTx", &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}).Return(nil).Once()
session.On("Rollback", mock.Anything).Return(nil).Once()
session.On("Rollback").Return(nil).Once()

request := streamRequest(t, "limit=2")
request = request.WithContext(context.WithValue(
@@ -517,17 +516,17 @@ func TestRepeatableReadStream(t *testing.T) {
}

session := &db.MockSession{}
session.On("BeginTx", mock.Anything, &sql.TxOptions{
session.On("BeginTx", &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}).Return(nil).Once()
session.On("Rollback", mock.Anything).Return(nil).Once()
session.On("Rollback").Return(nil).Once()

session.On("BeginTx", mock.Anything, &sql.TxOptions{
session.On("BeginTx", &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}).Return(nil).Once()
session.On("Rollback", mock.Anything).Return(nil).Once()
session.On("Rollback").Return(nil).Once()

request := streamRequest(t, "")
request = request.WithContext(context.WithValue(
14 changes: 7 additions & 7 deletions services/horizon/internal/ingest/build_state_test.go
Original file line number Diff line number Diff line change
@@ -50,8 +50,8 @@ func (s *BuildStateTestSuite) SetupTest() {
}
s.system.initMetrics()

s.historyQ.On("Begin", s.ctx).Return(nil).Once()
s.historyQ.On("Rollback", s.ctx).Return(nil).Once()
s.historyQ.On("Begin").Return(nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()

s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(63)).Return(false, nil).Once()
s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(63)).Return(nil).Once()
@@ -136,7 +136,7 @@ func (s *BuildStateTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail()
func (s *BuildStateTestSuite) TestBeginReturnsError() {
// Recreate mock in this single test to remove assertions.
*s.historyQ = mockDBQ{}
s.historyQ.On("Begin", s.ctx).Return(errors.New("my error")).Once()
s.historyQ.On("Begin").Return(errors.New("my error")).Once()

next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)
s.Assert().Error(err)
@@ -316,7 +316,7 @@ func (s *BuildStateTestSuite) TestUpdateCommitReturnsError() {
s.historyQ.On("UpdateIngestVersion", s.ctx, CurrentVersion).
Return(nil).
Once()
s.historyQ.On("Commit", s.ctx).
s.historyQ.On("Commit").
Return(errors.New("my error")).
Once()
next, err := buildState{checkpointLedger: s.checkpointLedger}.run(s.system)
@@ -338,7 +338,7 @@ func (s *BuildStateTestSuite) TestBuildStateSucceeds() {
s.historyQ.On("UpdateIngestVersion", s.ctx, CurrentVersion).
Return(nil).
Once()
s.historyQ.On("Commit", s.ctx).
s.historyQ.On("Commit").
Return(nil).
Once()

@@ -366,7 +366,7 @@ func (s *BuildStateTestSuite) TestUpdateCommitReturnsErrorStop() {
s.historyQ.On("UpdateIngestVersion", s.ctx, CurrentVersion).
Return(nil).
Once()
s.historyQ.On("Commit", s.ctx).
s.historyQ.On("Commit").
Return(errors.New("my error")).
Once()
next, err := buildState{checkpointLedger: s.checkpointLedger, stop: true}.run(s.system)
@@ -388,7 +388,7 @@ func (s *BuildStateTestSuite) TestBuildStateSucceedStop() {
s.historyQ.On("UpdateIngestVersion", s.ctx, CurrentVersion).
Return(nil).
Once()
s.historyQ.On("Commit", s.ctx).
s.historyQ.On("Commit").
Return(nil).
Once()

Loading