From c099cdbea5111006358aa20563aa9ae7e03c32df Mon Sep 17 00:00:00 2001 From: Kirill Gavrilov Date: Wed, 9 Aug 2023 16:24:40 +0300 Subject: [PATCH 1/2] fix lock/unlock key range in coordinator --- coordinator/provider/coordinator.go | 17 ++++++++++++++++- qdb/etcdqdb.go | 4 ++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/coordinator/provider/coordinator.go b/coordinator/provider/coordinator.go index fe1ed85fb..ae2258ed2 100644 --- a/coordinator/provider/coordinator.go +++ b/coordinator/provider/coordinator.go @@ -170,6 +170,12 @@ func DialRouter(r *topology.Router) (*grpc.ClientConn, error) { return grpc.Dial(r.Address, grpc.WithInsecure()) //nolint:all } +type CoordinatorClient interface { + client.Client + + CancelMsg() *pgproto3.CancelRequest +} + type qdbCoordinator struct { coordinator.Coordinator db qdb.QDB @@ -909,13 +915,17 @@ func (qc *qdbCoordinator) UnregisterRouter(ctx context.Context, rID string) erro return qc.db.DeleteRouter(ctx, rID) } -func (qc *qdbCoordinator) PrepareClient(nconn net.Conn) (client.Client, error) { +func (qc *qdbCoordinator) PrepareClient(nconn net.Conn) (CoordinatorClient, error) { cl := psqlclient.NewPsqlClient(nconn) if err := cl.Init(nil); err != nil { return nil, err } + if cl.CancelMsg() != nil { + return cl, nil + } + spqrlog.Zero.Info(). Str("user", cl.Usr()). Str("db", cl.DB()). @@ -946,6 +956,11 @@ func (qc *qdbCoordinator) ProcClient(ctx context.Context, nconn net.Conn) error return err } + if cl.CancelMsg() != nil { + // TODO: cancel client here + return nil + } + ci := grpcConnectionIterator{qdbCoordinator: qc} cli := clientinteractor.NewPSQLInteractor(cl) for { diff --git a/qdb/etcdqdb.go b/qdb/etcdqdb.go index 9d6293b40..b64a76fe6 100644 --- a/qdb/etcdqdb.go +++ b/qdb/etcdqdb.go @@ -364,7 +364,7 @@ func (q *EtcdQDB) LockKeyRange(ctx context.Context, id string) (*KeyRange, error } defer unlockMutex(mu, ctx) - resp, err := q.cli.Get(ctx, keyLockPath(keyRangeID)) + resp, err := q.cli.Get(ctx, keyLockPath(keyRangeNodePath(keyRangeID))) if err != nil { return nil, err } @@ -428,7 +428,7 @@ func (q *EtcdQDB) UnlockKeyRange(ctx context.Context, id string) error { } defer unlockMutex(mu, ctx) - resp, err := q.cli.Get(ctx, keyLockPath(keyRangeID)) + resp, err := q.cli.Get(ctx, keyLockPath(keyRangeNodePath(keyRangeID))) if err != nil { return err } From eca20bf77bab8c731354aa75d66316ff425bf8f6 Mon Sep 17 00:00:00 2001 From: Kirill Gavrilov Date: Wed, 9 Aug 2023 17:50:09 +0300 Subject: [PATCH 2/2] fix e2e tests --- docker/tests/bin/move.sh | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/docker/tests/bin/move.sh b/docker/tests/bin/move.sh index 6ed741130..ab664716b 100755 --- a/docker/tests/bin/move.sh +++ b/docker/tests/bin/move.sh @@ -39,21 +39,11 @@ psql "host=spqr_router_1_1 sslmode=disable user=user1 dbname=db1 port=6432" -c " exit 1 } -psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c "LOCK KEY RANGE krid2;" || { - echo "ERROR: tests failed" - exit 1 -} - psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c "MOVE KEY RANGE krid2 to sh1;" || { echo "ERROR: tests failed" exit 1 } -psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c "UNLOCK KEY RANGE krid2;" || { - echo "ERROR: tests failed" - exit 1 -} - out=$(psql "host=spqr_shard_1 sslmode=disable user=user1 dbname=db1 port=6432" -c "select * from xMove") test "$out" = " w_id | s ------+-----