Skip to content

Commit

Permalink
Improve filter rules reloading (#605)
Browse files Browse the repository at this point in the history
* fix: reload filter when vttablet just stat

* fix: synchronize the filter create, drop, alter process

* fix: concurrency control

* fix: comment useless code
  • Loading branch information
newborn22 authored Dec 18, 2024
1 parent 30da642 commit 5336f06
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go/vt/vttablet/customrule/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
DatabaseCustomRuleDbName = sidecardb.SidecarDBName
DatabaseCustomRuleTableName = "wescale_plugin"
DatabaseCustomRuleReloadInterval = 60 * time.Second
DatabaseCustomRuleNotifierDelayTime = 100 * time.Millisecond
DatabaseCustomRuleNotifierDelayTime = 100 * time.Millisecond // time to wait for primary inserting filters and the filters are synced to replicas
)

func registerFlags(fs *pflag.FlagSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package databasecustomrule

import (
"context"
"sync"

"fmt"
"reflect"
Expand All @@ -27,6 +28,8 @@ const databaseCustomRuleSource string = "DATABASE_CUSTOM_RULE"

// databaseCustomRule is the database backed implementation.
type databaseCustomRule struct {
mu sync.Mutex

// controller is set at construction time.
controller tabletserver.Controller

Expand All @@ -45,13 +48,18 @@ func newDatabaseCustomRule(qsc tabletserver.Controller) (*databaseCustomRule, er

func (cr *databaseCustomRule) start() {
go func() {
// reload rules that already in the database once start
if err := cr.reloadRulesFromDatabase(); err != nil {
log.Warningf("Background watch of database custom rule failed: %v", err)
}

intervalTicker := time.NewTicker(customrule.DatabaseCustomRuleReloadInterval)
defer intervalTicker.Stop()

for {
select {
case <-intervalTicker.C:
case <-customrule.Watch():
//case <-customrule.Watch():
}

if err := cr.reloadRulesFromDatabase(); err != nil {
Expand Down Expand Up @@ -84,6 +92,9 @@ func (cr *databaseCustomRule) applyRules(qr *sqltypes.Result) error {
qrs.Add(rule)
}

cr.mu.Lock()
defer cr.mu.Unlock()

if !reflect.DeepEqual(cr.qrs, qrs) {
cr.qrs = qrs.Copy()
cr.controller.SetQueryRules(databaseCustomRuleSource, qrs)
Expand Down Expand Up @@ -122,6 +133,7 @@ func activateTopoCustomRules(qsc tabletserver.Controller) {
if err != nil {
log.Fatalf("cannot start DatabaseCustomRule: %v", err)
}
customrule.WaitForFilter = cr.WaitForFilter
cr.start()

servenv.OnTerm(cr.stop)
Expand All @@ -131,3 +143,39 @@ func activateTopoCustomRules(qsc tabletserver.Controller) {
func init() {
tabletserver.RegisterFunctions = append(tabletserver.RegisterFunctions, activateTopoCustomRules)
}

func (cr *databaseCustomRule) WaitForFilter(name string, shouldExists bool) error {
timeoutDuration := 5 * time.Second
interval := 100 * time.Millisecond

timeout := time.After(timeoutDuration)
ticker := time.NewTicker(interval)
defer ticker.Stop()

err := cr.reloadRulesFromDatabase()
if err != nil {
return fmt.Errorf("failed to reload rules from database: %v", err)
}

for {
select {
case <-timeout:
return fmt.Errorf("wait for filter reload timeout")
case <-ticker.C:
filter := cr.FindFilter(name)
if shouldExists && filter != nil || !shouldExists && filter == nil {
return nil
}
err := cr.reloadRulesFromDatabase()
if err != nil {
return fmt.Errorf("failed to reload rules from database: %v", err)
}
}
}
}

func (cr *databaseCustomRule) FindFilter(name string) *rules.Rule {
cr.mu.Lock()
defer cr.mu.Unlock()
return cr.qrs.Find(name)
}
2 changes: 2 additions & 0 deletions go/vt/vttablet/customrule/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ func NotifyReload() {
func Watch() <-chan struct{} {
return customRuleChanged
}

var WaitForFilter func(name string, shouldExists bool) error
54 changes: 41 additions & 13 deletions go/vt/vttablet/tabletserver/common_query_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"strconv"
"time"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"

"vitess.io/vitess/go/vt/vttablet/customrule"

Expand Down Expand Up @@ -82,34 +83,61 @@ func (qe *QueryEngine) TabletsPlans(alias *topodatapb.TabletAlias) (*sqltypes.Re
}

func (qe *QueryEngine) HandleWescaleFilterRequest(sql string, isPrimary bool) (*sqltypes.Result, error) {

stmt, _, err := sqlparser.Parse2(sql)
if err != nil {
return nil, err
}

switch s := stmt.(type) {
case *sqlparser.CreateWescaleFilter:
defer customrule.NotifyReload()
if !isPrimary {
rst := &sqltypes.Result{}
if isPrimary {
rst, err = qe.HandleCreateFilter(s)
if err != nil {
return nil, err
}
} else {
// wait for primary inserting filters and the filters are synced to replicas
time.Sleep(customrule.DatabaseCustomRuleNotifierDelayTime)
return nil, nil
}
return qe.HandleCreateFilter(s)

customrule.WaitForFilter(s.Name, true)
return rst, nil

case *sqlparser.AlterWescaleFilter:
defer customrule.NotifyReload()
if !isPrimary {
rst := &sqltypes.Result{}
if isPrimary {
rst, err = qe.HandleAlterFilter(s)
if err != nil {
return nil, err
}
} else {
// wait for primary inserting filters and the filters are synced to replicas
time.Sleep(customrule.DatabaseCustomRuleNotifierDelayTime)
return nil, nil
}
return qe.HandleAlterFilter(s)

nameToWait := s.AlterInfo.Name
if nameToWait == rules.UnsetValueOfStmt {
nameToWait = s.OriginName
}
customrule.WaitForFilter(nameToWait, true)
return rst, nil

case *sqlparser.DropWescaleFilter:
defer customrule.NotifyReload()
if !isPrimary {
rst := &sqltypes.Result{}
if isPrimary {
rst, err = qe.HandleDropFilter(s)
if err != nil {
return nil, err
}
} else {
// wait for primary inserting filters and the filters are synced to replicas
time.Sleep(customrule.DatabaseCustomRuleNotifierDelayTime)
return nil, nil
}
return qe.HandleDropFilter(s)

customrule.WaitForFilter(s.Name, false)
return rst, nil

case *sqlparser.ShowWescaleFilter:
return qe.HandleShowFilter(s)
}
Expand Down

0 comments on commit 5336f06

Please sign in to comment.