Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR - MED attribute for inter-AS adv & block-cti sync support #373

Merged
merged 7 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ const (

const (
// HighLocalPref - High local preference for advertising BGP route(Default or Master)
HighLocalPref = 101
HighLocalPref = 5000
// LowLocalPref - Low local preference for advertising BGP route(Backup)
LowLocalPref = 100
// HighMed - Low metric means higher probability for selection outiside AS
HighMed = 10
// LowMed - High metric means lower probability for selection outiside AS
LowMed = 20
)

const (
Expand Down
2 changes: 1 addition & 1 deletion loxilb-ebpf
64 changes: 57 additions & 7 deletions loxinet/dpbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ type DpHookInterface interface {
DpCtAdd(w *DpCtInfo) int
DpCtDel(w *DpCtInfo) int
DpCtGetAsync()
DpGetLock()
DpRelLock()
}

// DpPeer - Remote DP Peer information
Expand Down Expand Up @@ -497,7 +499,7 @@ func (dp *DpH) WaitXsyncReady(who string) {
}

// DpXsyncRPC - Routine for syncing connection information with peers
func (dp *DpH) DpXsyncRPC(op DpSyncOpT, cti *DpCtInfo) int {
func (dp *DpH) DpXsyncRPC(op DpSyncOpT, arg interface{}) int {
var reply int
timeout := 2 * time.Second
dp.SyncMtx.Lock()
Expand All @@ -509,6 +511,15 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, cti *DpCtInfo) int {

rpcRetries := 0
rpcErr := false
var cti *DpCtInfo = nil
var blkCti []DpCtInfo

switch na := arg.(type) {
case *DpCtInfo:
cti = na
case []DpCtInfo:
blkCti = na
}

for idx := range mh.dp.Peers {
restartRPC:
Expand All @@ -528,7 +539,11 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, cti *DpCtInfo) int {
reply = 0
rpcCallStr := ""
if op == DpSyncAdd || op == DpSyncBcast {
rpcCallStr = "XSync.DpWorkOnCtAdd"
if len(blkCti) > 0 {
rpcCallStr = "XSync.DpWorkOnBlockCtAdd"
} else {
rpcCallStr = "XSync.DpWorkOnCtAdd"
}
} else if op == DpSyncDelete {
rpcCallStr = "XSync.DpWorkOnCtDelete"
} else if op == DpSyncGet {
Expand All @@ -539,17 +554,28 @@ func (dp *DpH) DpXsyncRPC(op DpSyncOpT, cti *DpCtInfo) int {

var call *rpc.Call
if op == DpSyncAdd || op == DpSyncDelete || op == DpSyncBcast {
if cti == nil {
return -1
}
if op != DpSyncBcast {
if cti == nil && len(blkCti) <= 0 {
return -1
}

var tmpCti *DpCtInfo
if cti == nil {
tmpCti = &blkCti[0]
} else {
tmpCti = cti
}
// FIXME - There is a race condition here
cIState, _ := mh.has.CIStateGetInst(cti.CI)
cIState, _ := mh.has.CIStateGetInst(tmpCti.CI)
if cIState != "MASTER" {
return 0
}
}
call = pe.Client.Go(rpcCallStr, *cti, &reply, make(chan *rpc.Call, 1))
if cti != nil {
call = pe.Client.Go(rpcCallStr, *cti, &reply, make(chan *rpc.Call, 1))
} else {
call = pe.Client.Go(rpcCallStr, blkCti, &reply, make(chan *rpc.Call, 1))
}
} else {
async := 1
call = pe.Client.Go(rpcCallStr, async, &reply, make(chan *rpc.Call, 1))
Expand Down Expand Up @@ -598,6 +624,30 @@ func DpBrokerInit(dph DpHookInterface) *DpH {
return nDp
}

// DpWorkOnBlockCtAdd - Add block CT entries from remote
func (xs *XSync) DpWorkOnBlockCtAdd(blockCtis []DpCtInfo, ret *int) error {
if !mh.ready {
return errors.New("Not-Ready")
}

*ret = 0

mh.dp.DpHooks.DpGetLock()

for _, cti := range blockCtis {

tk.LogIt(tk.LogDebug, "RPC - Block CT Add %s\n", cti.Key())
r := mh.dp.DpHooks.DpCtAdd(&cti)
if r != 0 {
*ret = r
}
}

mh.dp.DpHooks.DpRelLock()

return nil
}

// DpWorkOnCtAdd - Add a CT entry from remote
func (xs *XSync) DpWorkOnCtAdd(cti DpCtInfo, ret *int) error {
if !mh.ready {
Expand Down
21 changes: 19 additions & 2 deletions loxinet/dpebpf_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -1714,6 +1714,7 @@ func dpCTMapChkUpdates() {
defer mh.dpEbpf.mtx.Unlock()
var tact C.struct_dp_ct_tact
var act *C.struct_dp_ct_dat
var blkCti []DpCtInfo

tc := time.Now()
fd := C.llb_map2fd(C.LL_DP_CT_MAP)
Expand Down Expand Up @@ -1746,7 +1747,7 @@ func dpCTMapChkUpdates() {
delete(mh.dpEbpf.ctMap, cti.Key())
mh.dpEbpf.ctMap[goCtEnt.Key()] = goCtEnt
ctStr := goCtEnt.String()
tk.LogIt(tk.LogInfo, "[CT] %s - %s\n", "update", ctStr)
tk.LogIt(tk.LogDebug, "[CT] %s - %s\n", "update", ctStr)
if goCtEnt.CState == "est" {
goCtEnt.XSync = true
goCtEnt.NTs = time.Now()
Expand Down Expand Up @@ -1801,7 +1802,8 @@ func dpCTMapChkUpdates() {
ret = mh.dp.DpXsyncRPC(DpSyncDelete, cti)
cti.Deleted++
} else {
ret = mh.dp.DpXsyncRPC(DpSyncAdd, cti)
blkCti = append(blkCti, *cti)
//ret = mh.dp.DpXsyncRPC(DpSyncAdd, cti)
}
if ret == 0 || cti.Deleted > ctiDeleteSyncRetries {
cti.XSync = false
Expand All @@ -1814,6 +1816,11 @@ func dpCTMapChkUpdates() {
}
}
}

if len(blkCti) > 0 {
tk.LogIt(tk.LogDebug, "[CT] Block Sync - \n")
mh.dp.DpXsyncRPC(DpSyncAdd, blkCti)
}
}

// dpMapNotifierWorker - Work on any map notifications
Expand Down Expand Up @@ -1938,3 +1945,13 @@ func (e *DpEbpfH) DpCtGetAsync() {
// }
// }
}

// DpTakeLock - routine to take underlying DP lock
func (e *DpEbpfH) DpGetLock() {
C.llb_xh_lock()
}

// DpRelLock - routine to release underlying DP lock
func (e *DpEbpfH) DpRelLock() {
C.llb_xh_unlock()
}
Loading