Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single coord #280

Merged
merged 8 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion coordinator/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func NewApp(c coordinator.Coordinator) *App {
func (app *App) Run() error {
spqrlog.Zero.Info().Msg("running coordinator app")

app.coordinator.RunCoordinator(context.TODO())

wg := &sync.WaitGroup{}

wg.Add(2)
Expand Down Expand Up @@ -104,7 +106,7 @@ func (app *App) ServeGrpc(wg *sync.WaitGroup) error {
protos.RegisterShardServiceServer(serv, shardServ)

httpAddr := config.CoordinatorConfig().HttpAddr

spqrlog.Zero.Info().
Str("address", httpAddr).
Msg("serve grpc coordinator service")
Expand Down
4 changes: 4 additions & 0 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package coordinator

import (
"context"

"github.com/pg-sharding/spqr/pkg/meta"

"github.com/pg-sharding/spqr/pkg/clientinteractor"
Expand All @@ -9,4 +11,6 @@ import (
type Coordinator interface {
clientinteractor.Interactor
meta.EntityMgr

RunCoordinator(ctx context.Context)
}
37 changes: 30 additions & 7 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,22 +256,46 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) {
}
}

// NewCoordinator side efferc: runs async goroutine that checks
// spqr router`s availability
func NewCoordinator(db qdb.XQDB) *qdbCoordinator {
cc := &qdbCoordinator{
return &qdbCoordinator{
db: db,
}
}

func (cc *qdbCoordinator) lockCoordinator(ctx context.Context) bool {
if cc.db.TryCoordinatorLock(context.TODO()) != nil {
for {
select {
case <-ctx.Done():
return false
case <-time.After(time.Second):
if cc.db.TryCoordinatorLock(context.TODO()) == nil {
return true
}
spqrlog.Zero.Debug().Msg("qdb already taken, waiting for connection")
}
}
}

return true
}

// RunCoordinator side effect: it runs an asynchronous goroutine
// that checks the availability of the SPQR router
func (cc *qdbCoordinator) RunCoordinator(ctx context.Context) {
if !cc.lockCoordinator(ctx) {
return
}

ranges, err := db.ListKeyRanges(context.TODO())
ranges, err := cc.db.ListKeyRanges(context.TODO())
if err != nil {
spqrlog.Zero.Error().
Err(err).
Msg("faild to list key ranges")
}

for _, r := range ranges {
tx, err := db.GetTransferTx(context.TODO(), r.KeyRangeID)
tx, err := cc.db.GetTransferTx(context.TODO(), r.KeyRangeID)
if err != nil {
continue
}
Expand All @@ -290,14 +314,13 @@ func NewCoordinator(db qdb.XQDB) *qdbCoordinator {
datatransfers.ResolvePreparedTransaction(context.TODO(), tx.FromShardId, tx.FromTxName, false)
}

err = db.RemoveTransferTx(context.TODO(), r.KeyRangeID)
err = cc.db.RemoveTransferTx(context.TODO(), r.KeyRangeID)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("error removing from qdb")
}
}

go cc.watchRouters(context.TODO())
return cc
}

// traverseRouters traverse each route and run callback for each of them
Expand Down
36 changes: 36 additions & 0 deletions qdb/etcdqdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/clientv3util"
"go.etcd.io/etcd/client/v3/concurrency"
"google.golang.org/grpc"

Expand Down Expand Up @@ -52,6 +53,9 @@ const (
routersNamespace = "/routers"
shardingRulesNamespace = "/sharding_rules"
shardsNamespace = "/shards"
CoordKeepAliveTtl = 3
coordLockKey = "coordinator_exists"
coordLockVal = "exists"
)

func keyLockPath(key string) string {
Expand Down Expand Up @@ -531,6 +535,38 @@ func (q *EtcdQDB) RemoveTransferTx(ctx context.Context, key string) error {
return nil
}

// ==============================================================================
// COORDINATOR LOCK
// ==============================================================================

func (q *EtcdQDB) TryCoordinatorLock(ctx context.Context) error {
resp, err := q.cli.Lease.Grant(ctx, CoordKeepAliveTtl)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("Failed to make lease")
return err
}

op := clientv3.OpPut(coordLockKey, coordLockVal, clientv3.WithLease(clientv3.LeaseID(resp.ID)))
tx := q.cli.Txn(ctx).If(clientv3util.KeyMissing(coordLockKey)).Then(op)
stat, err := tx.Commit()
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("Failed to commit coordinator lock")
return err
}

if !stat.Succeeded {
return fmt.Errorf("qdb is already in use")
}

_, err = q.cli.Lease.KeepAlive(ctx, resp.ID)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("Failed to renew lock")
return err
}

return nil
}

// ==============================================================================
// ROUTERS
// ==============================================================================
Expand Down
8 changes: 8 additions & 0 deletions qdb/memqdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,14 @@ func (q *MemQDB) RemoveTransferTx(ctx context.Context, key string) error {
return ExecuteCommands(q.DumpState, NewDeleteCommand(q.Transactions, key))
}

// ==============================================================================
// COORDINATOR LOCK
// ==============================================================================

func (q *MemQDB) TryCoordinatorLock(ctx context.Context) error {
return nil
}

// ==============================================================================
// ROUTERS
// ==============================================================================
Expand Down
2 changes: 2 additions & 0 deletions qdb/qdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type XQDB interface {
// data move state
ShardingSchemaKeeper
DistributedXactKepper

TryCoordinatorLock(ctx context.Context) error
}

func NewXQDB(qdbType string) (XQDB, error) {
Expand Down
16 changes: 16 additions & 0 deletions test/feature/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ services:
- "router"
- "router2"
- "qdb01"

coordinator2:
build:
dockerfile: ./docker/coordinator/Dockerfile
context: ../../
ports:
- "7004:7002"
- "7005:7003"
environment:
- COORDINATOR_CONFIG=${COORDINATOR_CONFIG}
hostname: regress_coordinator
container_name: regress_coordinator2
depends_on:
- "router"
- "router2"
- "qdb01"

qdb01:
image: 'bitnami/etcd:latest'
Expand Down
1 change: 1 addition & 0 deletions test/feature/features/coordinator.feature
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ Feature: Coordinator test
#
# Coordinator is Up
#
Given host "coordinator2" is stopped
When I run SQL on host "coordinator"
"""
CREATE KEY RANGE krid3 FROM 31 TO 40 ROUTE TO sh1
Expand Down
9 changes: 3 additions & 6 deletions test/feature/features/move_recover.feature
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ Feature: Move recover test
002
"""
Given host "coordinator" is stopped
Given host "coordinator" is started
When I execute SQL on host "coordinator"
When I execute SQL on host "coordinator2"
"""
SHOW routers
"""
Expand Down Expand Up @@ -116,8 +115,7 @@ Feature: Move recover test
002
"""
Given host "coordinator" is stopped
Given host "coordinator" is started
When I execute SQL on host "coordinator"
When I execute SQL on host "coordinator2"
"""
SHOW routers
"""
Expand Down Expand Up @@ -173,8 +171,7 @@ Feature: Move recover test
Then command return code should be "0"
And qdb should contain transaction "krid1"
Given host "coordinator" is stopped
Given host "coordinator" is started
When I execute SQL on host "coordinator"
When I execute SQL on host "coordinator2"
"""
SHOW routers
"""
Expand Down
55 changes: 55 additions & 0 deletions test/feature/features/multicoordinator.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
Feature: Coordinator test
Background:
Given cluster is up and running
When I run SQL on host "coordinator"
"""
REGISTER ROUTER r1 ADDRESS regress_router::7000;
REGISTER ROUTER r2 ADDRESS regress_router_2::7000
"""
Then command return code should be "0"

Scenario: Second coordinator awaits
When I run SQL on host "coordinator"
"""
SHOW routers
"""
Then command return code should be "0"
And SQL result should match regexp
"""
router
"""
When I run SQL on host "coordinator2"
"""
SHOW routers
"""
Then command return code should be "1"

Scenario: Second coordinator turns on when other is dead
Given host "coordinator" is stopped
When I run SQL on host "coordinator2"
"""
SHOW routers
"""
Then command return code should be "0"
And SQL result should match regexp
"""
router
"""

Scenario: first coordinator awaits after recovery
Given host "coordinator" is stopped
When I run SQL on host "coordinator2"
"""
SHOW routers
"""
Then command return code should be "0"
And SQL result should match regexp
"""
router
"""
Given host "coordinator" is started
When I run SQL on host "coordinator"
"""
SHOW routers
"""
Then command return code should be "1"
28 changes: 26 additions & 2 deletions test/feature/spqr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,19 @@ func (tctx *testContext) connectorWithCredentials(username string, password stri
func (tctx *testContext) getPostgresqlConnection(host string) (*sqlx.DB, error) {
db, ok := tctx.dbs[host]
if !ok {
return nil, fmt.Errorf("postgresql %s is not in cluster", host)
addr, err := tctx.composer.GetAddr(host, coordinatorPort)
if err != nil {
return nil, fmt.Errorf("postgresql %s is not in cluster", host)
}

db, err := tctx.connectCoordinatorWithCredentials(shardUser, shardPassword, addr, postgresqlInitialConnectTimeout)
if err != nil {
return nil, fmt.Errorf("postgresql %s is not in cluster", host)
}
tctx.dbs[host] = db
return db, nil
}
if strings.HasSuffix(host, "admin") || host == "coordinator" {
if strings.HasSuffix(host, "admin") || strings.HasPrefix(host, "coordinator") {
return db, nil
}
err := db.Ping()
Expand Down Expand Up @@ -371,6 +381,17 @@ func (tctx *testContext) stepClusterIsUpAndRunning(createHaNodes bool) error {
return fmt.Errorf("failed to setup compose cluster: %s", err)
}

err = tctx.stepHostIsStopped("coordinator2")
if err != nil {
return err
}
defer func() {
err := tctx.stepHostIsStarted("coordinator2")
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to start second coordinator")
}
}()

// check databases
for _, service := range tctx.composer.Services() {
if strings.HasPrefix(service, spqrShardName) {
Expand Down Expand Up @@ -429,6 +450,9 @@ func (tctx *testContext) stepClusterIsUpAndRunning(createHaNodes bool) error {

// check coordinator
for _, service := range tctx.composer.Services() {
if strings.HasPrefix(service, spqrCoordinatorName+"2") {
continue
}
if strings.HasPrefix(service, spqrCoordinatorName) {
addr, err := tctx.composer.GetAddr(service, coordinatorPort)
if err != nil {
Expand Down
Loading