Skip to content

Commit

Permalink
Try to parse funcation application params for non-sharding rule colum…
Browse files Browse the repository at this point in the history
…n. (#451)

* fix

* Fix linter
  • Loading branch information
reshke authored Jan 24, 2024
1 parent 0bd68fc commit 36ed5ed
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 25 deletions.
8 changes: 8 additions & 0 deletions pkg/pool/dbpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pg-sharding/spqr/pkg/shard"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/pkg/tsa"
"github.com/pg-sharding/spqr/pkg/txstatus"
)

type InstancePoolImpl struct {
Expand Down Expand Up @@ -178,6 +179,13 @@ func (s *InstancePoolImpl) Put(sh shard.Shard) error {
Msg("discarding unsync connection")
return s.pool.Discard(sh)
}
if sh.TxStatus() != txstatus.TXIDLE {
spqrlog.Zero.Error().
Uint("shard", spqrlog.GetPointer(sh)).
Str("txstatus", sh.TxStatus().String()).
Msg("discarding non-idle connection")
return s.pool.Discard(sh)
}
return s.pool.Put(sh)
}

Expand Down
52 changes: 27 additions & 25 deletions router/qrouter/proxy_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,25 @@ func (qr *ProxyQrouter) RouteKeyWithRanges(ctx context.Context, expr lyx.Node, m
}
}

func (meta *RoutingMetadataContext) RecordShardingColumnValue(alias, colname, value string) {
if !meta.CheckColumnRls(colname) {
spqrlog.Zero.Debug().
Str("colname", colname).
Msg("skip column due no rule mathing")
return
}

resolvedRelation, err := meta.ResolveRelationByAlias(alias)
if err != nil {
// failed to relove relation, skip column
meta.unparsed_columns[colname] = struct{}{}
return
}

// will not work not ints
meta.RecordConstExpr(resolvedRelation, colname, value)
}

// TODO : unit tests
// deparse sharding column-value pair from query Where clause
func (qr *ProxyQrouter) routeByClause(ctx context.Context, expr lyx.Node, meta *RoutingMetadataContext) error {
Expand All @@ -232,48 +251,31 @@ func (qr *ProxyQrouter) routeByClause(ctx context.Context, expr lyx.Node, meta *

alias, colname := lft.TableAlias, lft.ColName

if !meta.CheckColumnRls(colname) {
spqrlog.Zero.Debug().
Str("colname", colname).
Msg("skip column due no rule mathing")
continue
}

resolvedRelation, err := meta.ResolveRelationByAlias(alias)
if err != nil {
// failed to relove relation, skip column
meta.unparsed_columns[colname] = struct{}{}
continue
}

/* simple key-value pair */
switch rght := texpr.Right.(type) {
case *lyx.ParamRef:
if rght.Number > len(meta.params) {
return ComplexQuery
if rght.Number <= len(meta.params) {
meta.RecordShardingColumnValue(alias, colname, string(meta.params[rght.Number-1]))
}

// will not work not ints
meta.RecordConstExpr(resolvedRelation, colname, string(meta.params[rght.Number-1]))

// else error out?
case *lyx.AExprSConst:
// TBD: postpone routing from here to root of parsing tree
meta.RecordConstExpr(resolvedRelation, colname, rght.Value)
meta.RecordShardingColumnValue(alias, colname, rght.Value)
case *lyx.AExprIConst:
// TBD: postpone routing from here to root of parsing tree
// maybe expimely inefficient. Will be fixed in SPQR-2.0
meta.RecordConstExpr(resolvedRelation, colname, fmt.Sprintf("%d", rght.Value))
meta.RecordShardingColumnValue(alias, colname, fmt.Sprintf("%d", rght.Value))
case *lyx.AExprList:
if len(rght.List) != 0 {
expr := rght.List[0]
switch bexpr := expr.(type) {
case *lyx.AExprSConst:
// TBD: postpone routing from here to root of parsing tree
meta.RecordConstExpr(resolvedRelation, colname, bexpr.Value)
meta.RecordShardingColumnValue(alias, colname, bexpr.Value)
case *lyx.AExprIConst:
// TBD: postpone routing from here to root of parsing tree
// maybe expimely inefficient. Will be fixed in SPQR-2.0
meta.RecordConstExpr(resolvedRelation, colname, fmt.Sprintf("%d", bexpr.Value))
meta.RecordShardingColumnValue(alias, colname, fmt.Sprintf("%d", bexpr.Value))
}
}
case *lyx.FuncApplication:
Expand Down Expand Up @@ -342,8 +344,8 @@ func (qr *ProxyQrouter) DeparseSelectStmt(ctx context.Context, selectStmt lyx.No

/* SELECT * FROM VALUES() ... */
case *lyx.ValueClause:
// TODO: process this
meta.ValuesLists = s.Values
return nil
}

return ComplexQuery
Expand Down
28 changes: 28 additions & 0 deletions router/qrouter/proxy_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,34 @@ func TestSingleShard(t *testing.T) {
assert.NoError(err)

for _, tt := range []tcase{

{
query: `
DELETE
FROM t
WHERE
j =
any(array(select * from t where i <= 2))
/* __spqr__default_route_behaviour: BLOCK */ returning *;
`,
err: nil,
exp: routingstate.ShardMatchState{
Route: &routingstate.DataShardRoute{
Shkey: kr.ShardKey{
Name: "sh1",
},
Matchedkr: &kr.KeyRange{
ShardID: "sh1",
ID: "id1",
Dataspace: dataspace,
LowerBound: []byte("1"),
UpperBound: []byte("11"),
},
},
TargetSessionAttrs: "any",
},
},

{
query: `
DELETE
Expand Down

0 comments on commit 36ed5ed

Please sign in to comment.