Skip to content

Commit

Permalink
Merge pull request #123 from hyperledger/eventstreams-ws
Browse files Browse the repository at this point in the history
Refactor WebSockets to fix window of duplicate delivery after reconect
  • Loading branch information
nguyer authored Feb 13, 2024
2 parents 80d20a1 + fd2e3f4 commit ea9c63d
Show file tree
Hide file tree
Showing 32 changed files with 1,268 additions and 778 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ endef
$(eval $(call makemock, $$(DBMIGRATE_PATH), Driver, dbmigratemocks))
$(eval $(call makemock, pkg/httpserver, GoHTTPServer, httpservermocks))
$(eval $(call makemock, pkg/auth, Plugin, authmocks))
$(eval $(call makemock, pkg/wsserver, WebSocketChannels, wsservermocks))
$(eval $(call makemock, pkg/wsserver, Protocol, wsservermocks))
$(eval $(call makemock, pkg/wsserver, WebSocketServer, wsservermocks))
$(eval $(call makemock, pkg/dbsql, CRUD, crudmocks))

Expand Down
4 changes: 3 additions & 1 deletion examples/ffpubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) {
// Use SQLite in-memory DB
conf := config.RootSection("ffpubsub")
eventstreams.InitConfig(conf)
wsConf := conf.SubSection("ws")
wsserver.InitConfig(wsConf)
dbConf := conf.SubSection("sqlite")
dbsql.InitSQLiteConfig(dbConf)
dbConf.Set(dbsql.SQLConfMigrationsAuto, true)
Expand All @@ -139,7 +141,7 @@ func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) {
sql, err := dbsql.NewSQLiteProvider(ctx, dbConf)
assertNoError(err)

wsServer := wsserver.NewWebSocketServer(ctx)
wsServer := wsserver.NewWebSocketServer(ctx, wsserver.GenerateConfig(wsConf))
server := httptest.NewServer(http.HandlerFunc(wsServer.Handler))
u, err := url.Parse(server.URL)
assertNoError(err)
Expand Down
2 changes: 1 addition & 1 deletion mocks/authmocks/plugin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 19 additions & 1 deletion mocks/crudmocks/crud.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/dbmigratemocks/driver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/httpservermocks/go_http_server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions mocks/wsservermocks/protocol.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 0 additions & 68 deletions mocks/wsservermocks/web_socket_channels.go

This file was deleted.

62 changes: 29 additions & 33 deletions mocks/wsservermocks/web_socket_server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion pkg/dbsql/crud.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -117,6 +117,7 @@ type CRUD[T Resource] interface {
NewFilterBuilder(ctx context.Context) ffapi.FilterBuilder
NewUpdateBuilder(ctx context.Context) ffapi.UpdateBuilder
GetQueryFactory() ffapi.QueryFactory
TableAlias() string
Scoped(scope sq.Eq) CRUD[T] // allows dynamic scoping to a collection
}

Expand Down Expand Up @@ -151,6 +152,13 @@ func (c *CrudBase[T]) Scoped(scope sq.Eq) CRUD[T] {
return &cScoped
}

func (c *CrudBase[T]) TableAlias() string {
if c.ReadTableAlias != "" {
return c.ReadTableAlias
}
return c.Table
}

func (c *CrudBase[T]) GetQueryFactory() ffapi.QueryFactory {
return c.QueryFactory
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/dbsql/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,9 @@ func TestLeftJOINExample(t *testing.T) {
ctx := context.Background()

crudables := newCRUDCollection(sql.db, "ns1")
assert.Equal(t, "crudables", crudables.TableAlias())
linkables := newLinkableCollection(sql.db, "ns1")
assert.Equal(t, "l", linkables.TableAlias())

c1 := &TestCRUDable{
ResourceBase: ResourceBase{
Expand Down
14 changes: 14 additions & 0 deletions pkg/dbsql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,20 @@ func (s *Database) DeleteTx(ctx context.Context, table string, tx *TXWrapper, q
return nil
}

func (s *Database) ExecTx(ctx context.Context, table string, tx *TXWrapper, sqlQuery string, args []interface{}) (sql.Result, error) {
l := log.L(ctx)
before := time.Now()
l.Tracef(`SQL-> exec: %s args: %+v`, sqlQuery, args)
res, err := tx.sqlTX.ExecContext(ctx, sqlQuery, args...)
if err != nil {
l.Errorf(`SQL exec: %s sql=[ %s ]: %s`, err, sqlQuery, err)
return nil, i18n.WrapError(ctx, err, i18n.MsgDBExecFailed)
}
ra, _ := res.RowsAffected()
l.Debugf(`SQL<- exec: %s affected=%d (%.2fms)`, table, ra, floatMillisSince(before))
return res, nil
}

func (s *Database) UpdateTx(ctx context.Context, table string, tx *TXWrapper, q sq.UpdateBuilder, postCommit func()) (int64, error) {
l := log.L(ctx)
sqlQuery, args, err := q.PlaceholderFormat(s.features.PlaceholderFormat).ToSql()
Expand Down
27 changes: 27 additions & 0 deletions pkg/dbsql/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,30 @@ func TestInsertTxRowsIncompleteReturn(t *testing.T) {
err = s.InsertTxRows(ctx, "table1", tx, sb, nil, []int64{1, 2}, false)
assert.Regexp(t, "FF00177", err)
}

func TestExecTxOk(t *testing.T) {
s, mdb := NewMockProvider().UTInit()
mdb.ExpectBegin()
mdb.ExpectExec("INSERT.*").WillReturnResult(driver.ResultNoRows)
ctx, tx, _, err := s.BeginOrUseTx(context.Background())
assert.NoError(t, err)
q := sq.Insert("mytable").Columns("some").Values("thing")
sqlQuery, args, err := q.PlaceholderFormat(s.features.PlaceholderFormat).ToSql()
assert.NoError(t, err)
res, err := s.ExecTx(ctx, "mytable", tx, sqlQuery, args)
assert.NoError(t, err)
assert.NotNil(t, res)
}

func TestExecTxFail(t *testing.T) {
s, mdb := NewMockProvider().UTInit()
mdb.ExpectBegin()
mdb.ExpectExec("INSERT.*").WillReturnError(fmt.Errorf("pop"))
ctx, tx, _, err := s.BeginOrUseTx(context.Background())
assert.NoError(t, err)
q := sq.Insert("mytable").Columns("some").Values("thing")
sqlQuery, args, err := q.PlaceholderFormat(s.features.PlaceholderFormat).ToSql()
assert.NoError(t, err)
_, err = s.ExecTx(ctx, "mytable", tx, sqlQuery, args)
assert.Regexp(t, "pop", err)
}
Loading

0 comments on commit ea9c63d

Please sign in to comment.