Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: move client pool to les/vflux/server #22495

Merged
merged 27 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
323cbf4
les: move client pool to les/vflux/server
zsfelfoldi Feb 20, 2021
5409e4c
les/vflux/server: un-expose NodeBalance, remove unused fn, fix bugs
zsfelfoldi Mar 16, 2021
42f5b60
tests/fuzzers/vflux: add ClientPool fuzzer
zsfelfoldi Mar 16, 2021
e84b252
les/vflux/server: fixed balance tests
zsfelfoldi Mar 16, 2021
cda86d9
les: rebase fix
zsfelfoldi Mar 17, 2021
c77a70b
les/vflux/server: fixed more bugs
zsfelfoldi Mar 17, 2021
89c45fa
les/vflux/server: unexported NodeStateMachine fields and flags
zsfelfoldi Mar 17, 2021
ed69bfb
les/vflux/server: unexport all internal components and functions
zsfelfoldi Mar 17, 2021
f8700b5
les/vflux/server: fixed priorityPool test
zsfelfoldi Mar 17, 2021
fc9790e
les/vflux/server: polish balance
rjl493456442 Mar 24, 2021
b63f123
les/vflux/server: fixed mutex locking error
zsfelfoldi Mar 24, 2021
42d08dc
les/vflux/server: priorityPool bug fixed
zsfelfoldi Mar 25, 2021
23ab832
common/prque: make Prque wrap-around priority handling optional
zsfelfoldi Mar 25, 2021
31dd954
les/vflux/server: rename funcs, small optimizations
zsfelfoldi Mar 25, 2021
9332261
les/vflux/server: fixed timeUntil
zsfelfoldi Mar 25, 2021
380d626
les/vflux/server: separated balance.posValue and negValue
zsfelfoldi Mar 25, 2021
b914785
les/vflux/server: polish setup
rjl493456442 Mar 25, 2021
0db4ff4
les/vflux/server: enforce capacity curve monotonicity
zsfelfoldi Mar 29, 2021
7032e9d
les/vflux/server: simplified requestCapacity
zsfelfoldi Mar 29, 2021
f7b4531
les/vflux/server: requestCapacity with target range, no iterations in…
zsfelfoldi Mar 29, 2021
e294fff
les/vflux/server: minor changes
zsfelfoldi Mar 29, 2021
50ddf0e
les/vflux/server: moved default factors to balanceTracker
zsfelfoldi Mar 29, 2021
f1000d6
les/vflux/server: set inactiveFlag in priorityPool
zsfelfoldi Mar 29, 2021
4a56ebe
les/vflux/server: moved related metrics to vfs package
zsfelfoldi Mar 29, 2021
590a834
les/vflux/client: make priorityPool temp state logic cleaner
zsfelfoldi Apr 1, 2021
46a3a27
les/vflux/server: changed log.Crit to log.Error
zsfelfoldi Apr 1, 2021
aca9e3a
add vflux fuzzer to oss-fuzz
zsfelfoldi Apr 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions common/prque/lazyqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type (
// NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
q := &LazyQueue{
popQueue: newSstack(nil),
popQueue: newSstack(nil, false),
setIndex: setIndex,
priority: priority,
maxPriority: maxPriority,
Expand All @@ -71,8 +71,8 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior

// Reset clears the contents of the queue
func (q *LazyQueue) Reset() {
q.queue[0] = newSstack(q.setIndex0)
q.queue[1] = newSstack(q.setIndex1)
q.queue[0] = newSstack(q.setIndex0, false)
q.queue[1] = newSstack(q.setIndex1, false)
}

// Refresh performs queue re-evaluation if necessary
Expand Down
7 changes: 6 additions & 1 deletion common/prque/prque.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ type Prque struct {

// New creates a new priority queue.
func New(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex)}
return &Prque{newSstack(setIndex, false)}
}

// NewWrapAround creates a new priority queue with wrap-around priority handling.
func NewWrapAround(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex, true)}
}

// Pushes a value with a given priority into the queue, expanding if necessary.
Expand Down
20 changes: 13 additions & 7 deletions common/prque/sstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,24 @@ type SetIndexCallback func(data interface{}, index int)
// the stack (heap) functionality and the Len, Less and Swap methods for the
// sortability requirements of the heaps.
type sstack struct {
setIndex SetIndexCallback
size int
capacity int
offset int
setIndex SetIndexCallback
size int
capacity int
offset int
wrapAround bool

blocks [][]*item
active []*item
}

// Creates a new, empty stack.
func newSstack(setIndex SetIndexCallback) *sstack {
func newSstack(setIndex SetIndexCallback, wrapAround bool) *sstack {
result := new(sstack)
result.setIndex = setIndex
result.active = make([]*item, blockSize)
result.blocks = [][]*item{result.active}
result.capacity = blockSize
result.wrapAround = wrapAround
return result
}

Expand Down Expand Up @@ -94,7 +96,11 @@ func (s *sstack) Len() int {
// Compares the priority of two elements of the stack (higher is first).
// Required by sort.Interface.
func (s *sstack) Less(i, j int) bool {
return (s.blocks[i/blockSize][i%blockSize].priority - s.blocks[j/blockSize][j%blockSize].priority) > 0
a, b := s.blocks[i/blockSize][i%blockSize].priority, s.blocks[j/blockSize][j%blockSize].priority
if s.wrapAround {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between the a>b and a-b>0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way they handle numeric overflows. For example, MaxInt64 > MinInt64 but MaxInt64-MinInt64 < 0. If we use the latter type of comparison then the individual priority values are allowed to overflow and "wrap around" the 64 bit range many times as long as the biggest difference between two priorities in the queue at any moment can be expressed in the int64 range. flowcontrol.ClientManager is a good example for this mode where there is a cumulative "buffer recharge integrator" that is continuously being increased and does overflow. rcQueue item priorities are all calculated relative to this value and the difference between any two item priorities fits safely into int64. The first (free clients only) client pool design also did take advantage of this feature but the current priority pool scheme uses absolute values.
This "wrap around" feature requires care to use and until now all prque.Prque-s operated in this mode. Now it's only used if the queue is explicitly created with the NewWrapAround constructor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks dangerous to me. Firstly it introduces this trick into stack, prque very implicitly. Besides are we sure that flowcontrol.ClientManager can handle the overflow properly?

cm.rcLastIntValue += int64(bonusRatio * float64(dt))

It's indeed increased all the time. But when it's overflow, all the relative calculations will be affected.

For example

dtNext := mclock.AbsTime(float64(rcqNode.rcFullIntValue-cm.rcLastIntValue) / bonusRatio)

This value can be a large negative value if the rcqNode.rcFullIntValue is overflowed.

// check whether it has already finished
		dtNext := mclock.AbsTime(float64(rcqNode.rcFullIntValue-cm.rcLastIntValue) / bonusRatio)
		if dt < dtNext {
			// not finished yet, put it back, update integrator according
			// to current bonusRatio and return
			cm.rcQueue.Push(rcqNode, -rcqNode.rcFullIntValue)
			cm.rcLastIntValue += int64(bonusRatio * float64(dt))
			return
		}
		lastUpdate += dtNext
		// finished recharging, update corrBufValue and sumRecharge if necessary and do next step
		if rcqNode.corrBufValue < int64(rcqNode.params.BufLimit) {
			rcqNode.corrBufValue = int64(rcqNode.params.BufLimit)
			cm.sumRecharge -= rcqNode.params.MinRecharge
		}
		cm.rcLastIntValue = rcqNode.rcFullIntValue

Then the lastUpdate will also be the large negative.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we should fix the overflow in the flowcontrol manager instead of introducing the tricks into the common data structure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR reverts to the non-tricky version in most of the cases and does not change the behavior of the client manager. I will change client manager's priority calculation and get rid of thip wrap-around trick altogether in a next PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

return a-b > 0
}
return a > b
}

// Swaps two elements in the stack. Required by sort.Interface.
Expand All @@ -110,5 +116,5 @@ func (s *sstack) Swap(i, j int) {

// Resets the stack, effectively clearing its contents.
func (s *sstack) Reset() {
*s = *newSstack(s.setIndex)
*s = *newSstack(s.setIndex, false)
}
6 changes: 3 additions & 3 deletions common/prque/sstack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestSstack(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), rand.Int63()}
}
stack := newSstack(nil)
stack := newSstack(nil, false)
for rep := 0; rep < 2; rep++ {
// Push all the data into the stack, pop out every second
secs := []*item{}
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestSstackSort(t *testing.T) {
data[i] = &item{rand.Int(), int64(i)}
}
// Push all the data into the stack
stack := newSstack(nil)
stack := newSstack(nil, false)
for _, val := range data {
stack.Push(val)
}
Expand All @@ -76,7 +76,7 @@ func TestSstackReset(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), rand.Int63()}
}
stack := newSstack(nil)
stack := newSstack(nil, false)
for rep := 0; rep < 2; rep++ {
// Push all the data into the stack, pop out every second
secs := []*item{}
Expand Down
103 changes: 54 additions & 49 deletions les/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ var (
errNoCheckpoint = errors.New("no local checkpoint provided")
errNotActivated = errors.New("checkpoint registrar is not activated")
errUnknownBenchmarkType = errors.New("unknown benchmark type")
errNoPriority = errors.New("priority too low to raise capacity")
)

// PrivateLightServerAPI provides an API to access the LES light server.
Expand All @@ -44,8 +43,8 @@ type PrivateLightServerAPI struct {
func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI {
return &PrivateLightServerAPI{
server: server,
defaultPosFactors: server.clientPool.defaultPosFactors,
defaultNegFactors: server.clientPool.defaultNegFactors,
defaultPosFactors: defaultPosFactors,
defaultNegFactors: defaultNegFactors,
}
}

Expand All @@ -66,7 +65,9 @@ func (api *PrivateLightServerAPI) ServerInfo() map[string]interface{} {
res := make(map[string]interface{})
res["minimumCapacity"] = api.server.minCapacity
res["maximumCapacity"] = api.server.maxCapacity
res["totalCapacity"], res["totalConnectedCapacity"], res["priorityConnectedCapacity"] = api.server.clientPool.capacityInfo()
_, res["totalCapacity"] = api.server.clientPool.Limits()
_, res["totalConnectedCapacity"] = api.server.clientPool.Active()
res["priorityConnectedCapacity"] = 0 //TODO connect when token sale module is added
return res
}

Expand All @@ -80,9 +81,18 @@ func (api *PrivateLightServerAPI) ClientInfo(nodes []string) map[enode.ID]map[st
}

res := make(map[enode.ID]map[string]interface{})
api.server.clientPool.forClients(ids, func(client *clientInfo) {
res[client.node.ID()] = api.clientInfo(client)
})
if len(ids) == 0 {
ids = api.server.peers.ids()
}
for _, id := range ids {
if peer := api.server.peers.peer(id); peer != nil {
res[id] = api.clientInfo(peer, peer.balance)
} else {
api.server.clientPool.BalanceOperation(id, "", func(balance vfs.AtomicBalanceOperator) {
res[id] = api.clientInfo(nil, balance)
})
}
}
return res
}

Expand All @@ -94,39 +104,43 @@ func (api *PrivateLightServerAPI) ClientInfo(nodes []string) map[enode.ID]map[st
// assigned to it.
func (api *PrivateLightServerAPI) PriorityClientInfo(start, stop enode.ID, maxCount int) map[enode.ID]map[string]interface{} {
res := make(map[enode.ID]map[string]interface{})
ids := api.server.clientPool.bt.GetPosBalanceIDs(start, stop, maxCount+1)
ids := api.server.clientPool.GetPosBalanceIDs(start, stop, maxCount+1)
if len(ids) > maxCount {
res[ids[maxCount]] = make(map[string]interface{})
ids = ids[:maxCount]
}
if len(ids) != 0 {
api.server.clientPool.forClients(ids, func(client *clientInfo) {
res[client.node.ID()] = api.clientInfo(client)
})
for _, id := range ids {
if peer := api.server.peers.peer(id); peer != nil {
res[id] = api.clientInfo(peer, peer.balance)
} else {
api.server.clientPool.BalanceOperation(id, "", func(balance vfs.AtomicBalanceOperator) {
res[id] = api.clientInfo(nil, balance)
})
}
}
return res
}

// clientInfo creates a client info data structure
func (api *PrivateLightServerAPI) clientInfo(c *clientInfo) map[string]interface{} {
func (api *PrivateLightServerAPI) clientInfo(peer *clientPeer, balance vfs.ReadOnlyBalance) map[string]interface{} {
info := make(map[string]interface{})
pb, nb := c.balance.GetBalance()
info["isConnected"] = c.connected
pb, nb := balance.GetBalance()
info["isConnected"] = peer != nil
info["pricing/balance"] = pb
info["priority"] = pb != 0
// cb := api.server.clientPool.ndb.getCurrencyBalance(id)
// info["pricing/currency"] = cb.amount
if c.connected {
info["connectionTime"] = float64(mclock.Now()-c.connectedAt) / float64(time.Second)
info["capacity"], _ = api.server.clientPool.ns.GetField(c.node, priorityPoolSetup.CapacityField).(uint64)
if peer != nil {
info["connectionTime"] = float64(mclock.Now()-peer.connectedAt) / float64(time.Second)
info["capacity"] = peer.getCapacity()
info["pricing/negBalance"] = nb
}
return info
}

// setParams either sets the given parameters for a single connected client (if specified)
// or the default parameters applicable to clients connected in the future
func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, client *clientInfo, posFactors, negFactors *vfs.PriceFactors) (updateFactors bool, err error) {
func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, client *clientPeer, posFactors, negFactors *vfs.PriceFactors) (updateFactors bool, err error) {
defParams := client == nil
for name, value := range params {
errValue := func() error {
Expand Down Expand Up @@ -156,9 +170,8 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien
setFactor(&negFactors.RequestFactor)
case !defParams && name == "capacity":
if capacity, ok := value.(float64); ok && uint64(capacity) >= api.server.minCapacity {
_, err = api.server.clientPool.setCapacity(client.node, client.address, uint64(capacity), 0, true)
// Don't have to call factor update explicitly. It's already done
// in setCapacity function.
_, err = api.server.clientPool.SetCapacity(client.Node(), uint64(capacity), 0, false)
// time factor recalculation is performed automatically by the balance tracker
} else {
err = errValue()
}
Expand All @@ -179,39 +192,33 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien
// SetClientParams sets client parameters for all clients listed in the ids list
// or all connected clients if the list is empty
func (api *PrivateLightServerAPI) SetClientParams(nodes []string, params map[string]interface{}) error {
var (
ids []enode.ID
err error
)
var err error
for _, node := range nodes {
if id, err := parseNode(node); err != nil {
var id enode.ID
if id, err = parseNode(node); err != nil {
return err
} else {
ids = append(ids, id)
}
}
api.server.clientPool.forClients(ids, func(client *clientInfo) {
if client.connected {
posFactors, negFactors := client.balance.GetPriceFactors()
update, e := api.setParams(params, client, &posFactors, &negFactors)
if peer := api.server.peers.peer(id); peer != nil {
posFactors, negFactors := peer.balance.GetPriceFactors()
update, e := api.setParams(params, peer, &posFactors, &negFactors)
if update {
client.balance.SetPriceFactors(posFactors, negFactors)
peer.balance.SetPriceFactors(posFactors, negFactors)
}
if e != nil {
err = e
}
} else {
err = fmt.Errorf("client %064x is not connected", client.node.ID())
err = fmt.Errorf("client %064x is not connected", id)
}
})
}
return err
}

// SetDefaultParams sets the default parameters applicable to clients connected in the future
func (api *PrivateLightServerAPI) SetDefaultParams(params map[string]interface{}) error {
update, err := api.setParams(params, nil, &api.defaultPosFactors, &api.defaultNegFactors)
if update {
api.server.clientPool.setDefaultFactors(api.defaultPosFactors, api.defaultNegFactors)
api.server.clientPool.SetDefaultFactors(api.defaultPosFactors, api.defaultNegFactors)
}
return err
}
Expand All @@ -224,7 +231,7 @@ func (api *PrivateLightServerAPI) SetConnectedBias(bias time.Duration) error {
if bias < time.Duration(0) {
return fmt.Errorf("bias illegal: %v less than 0", bias)
}
api.server.clientPool.setConnectedBias(bias)
api.server.clientPool.SetConnectedBias(bias)
return nil
}

Expand All @@ -235,8 +242,8 @@ func (api *PrivateLightServerAPI) AddBalance(node string, amount int64) (balance
if id, err = parseNode(node); err != nil {
return
}
api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo) {
balance[0], balance[1], err = c.balance.AddBalance(amount)
api.server.clientPool.BalanceOperation(id, "", func(nb vfs.AtomicBalanceOperator) {
balance[0], balance[1], err = nb.AddBalance(amount)
})
return
}
Expand Down Expand Up @@ -338,14 +345,12 @@ func (api *PrivateDebugAPI) FreezeClient(node string) error {
if id, err = parseNode(node); err != nil {
return err
}
api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo) {
if c.connected {
c.peer.freeze()
} else {
err = fmt.Errorf("client %064x is not connected", id[:])
}
})
return err
if peer := api.server.peers.peer(id); peer != nil {
peer.freeze()
return nil
} else {
return fmt.Errorf("client %064x is not connected", id[:])
}
}

// PrivateLightAPI provides an API to access the LES light server or light client.
Expand Down
Loading