Skip to content

Commit

Permalink
fix: modify the traversal condition for Items (#2250)
Browse files Browse the repository at this point in the history
Signed-off-by: XZ <834756128@qq.com>
Signed-off-by: Gaius <gaius.qi@gmail.com>
Co-authored-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
fcgxz2003 and gaius-qi authored Apr 10, 2023
1 parent d13acdd commit c8b0e89
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 31 deletions.
10 changes: 6 additions & 4 deletions scheduler/networktopology/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,22 @@ func (p *probes) Enqueue(probe *Probe) error {
p.items.PushBack(probe)

// Calculate the average RTT.
var averageRTT time.Duration
for e := p.items.Front(); e != nil; e = e.Next() {
probe, ok := e.Value.(*Probe)
if !ok {
return errors.New("invalid probe")
}

if p.items.Len() == 1 {
p.averageRTT.Store(probe.RTT)
if e == p.items.Front() {
averageRTT = probe.RTT
continue
}

p.averageRTT.Store(time.Duration(float64(p.averageRTT.Load())*DefaultMovingAverageWeight +
float64(probe.RTT)*(1-DefaultMovingAverageWeight)))
averageRTT = time.Duration(float64(averageRTT)*DefaultMovingAverageWeight +
float64(probe.RTT)*(1-DefaultMovingAverageWeight))
}
p.averageRTT.Store(averageRTT)

p.updatedAt = atomic.NewTime(probe.CreatedAt)
return nil
Expand Down
95 changes: 68 additions & 27 deletions scheduler/networktopology/probes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var (

mockProbe = &Probe{
Host: mockHost,
RTT: 30 * time.Nanosecond,
RTT: 3000000 * time.Nanosecond,
CreatedAt: time.Now(),
}

Expand Down Expand Up @@ -227,11 +227,11 @@ func TestProbes_Peek(t *testing.T) {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 31*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3100000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 32*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3200000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}
},
Expand Down Expand Up @@ -288,38 +288,72 @@ func TestProbes_Enqueue(t *testing.T) {
assert.Equal(p.AverageRTT(), probe.RTT)
},
},
{
name: "enqueue five probes",
probes: NewProbes(mockQueueLength, mockSeedHost),
mock: func(probes Probes) {
if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 3100000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 3200000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 3300000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 3400000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}
},
expect: func(t *testing.T, p Probes) {
assert := assert.New(t)
assert.Equal(p.Length(), 5)
assert.Equal(p.AverageRTT().Nanoseconds(), int64(3388890))

probe, peeked := p.Peek()
assert.True(peeked)
assert.EqualValues(probe, mockProbe)
},
},
{
name: "enqueue six probes",
probes: NewProbes(mockQueueLength, mockSeedHost),
mock: func(probes Probes) {
if err := probes.Enqueue(NewProbe(mockHost, 34*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3400000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(mockProbe); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 31*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3100000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 32*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3200000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 33*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3300000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 34*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3400000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}
},
expect: func(t *testing.T, p Probes) {
assert := assert.New(t)
assert.Equal(p.Length(), 5)
assert.Equal(p.AverageRTT().Nanoseconds(), int64(33))
assert.Equal(p.AverageRTT().Nanoseconds(), int64(3388890))

probe, peeked := p.Peek()
assert.True(peeked)
Expand Down Expand Up @@ -413,7 +447,7 @@ func TestProbes_Items(t *testing.T) {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 31*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3100000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -481,7 +515,7 @@ func TestProbes_Length(t *testing.T) {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 31*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3100000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -558,11 +592,11 @@ func TestProbes_UpdatedAt(t *testing.T) {
name: "enqueue three probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
mock: func(probes Probes) {
if err := probes.Enqueue(NewProbe(mockHost, 10*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 1000000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 100*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 10000000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -615,57 +649,64 @@ func TestProbes_AverageRTT(t *testing.T) {
},
},
{
name: "queue has three probe",
name: "queue has five probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
mock: func(probes Probes) {
if err := probes.Enqueue(NewProbe(mockHost, 10*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3000000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 3100000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 100*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3200000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 30*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3300000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 3400000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}
},
expect: func(t *testing.T, averageRTT time.Duration) {
assert := assert.New(t)
assert.Equal(averageRTT.Nanoseconds(), int64(36))
assert.Equal(averageRTT.Nanoseconds(), int64(3388890))
},
},
{
name: "qeueu has six probe",
name: "queue has six probe",
probes: NewProbes(mockQueueLength, mockSeedHost),
mock: func(probes Probes) {
if err := probes.Enqueue(NewProbe(mockHost, 30*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3400000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 30*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3000000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 30*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3100000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 30*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3200000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 30*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3300000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}

if err := probes.Enqueue(NewProbe(mockHost, 100*time.Nanosecond, time.Now())); err != nil {
if err := probes.Enqueue(NewProbe(mockHost, 3400000*time.Nanosecond, time.Now())); err != nil {
t.Fatal(err)
}
},
expect: func(t *testing.T, averageRTT time.Duration) {
assert := assert.New(t)
assert.Equal(averageRTT.Nanoseconds(), int64(93))
assert.Equal(averageRTT.Nanoseconds(), int64(3388890))
},
},
{
Expand Down

0 comments on commit c8b0e89

Please sign in to comment.