Skip to content

Commit

Permalink
all: add upstreams avg processing time
Browse files Browse the repository at this point in the history
  • Loading branch information
schzhn committed Jul 20, 2023
1 parent 187ad0c commit 6124456
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 80 deletions.
12 changes: 4 additions & 8 deletions internal/dnsforward/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,6 @@ func (s *Server) logQuery(
s.queryLog.Add(p)
}

// fromCache indicates that response was cached.
const fromCache = "cache"

// updatesStats writes the request into statistics.
func (s *Server) updateStats(
ctx *dnsContext,
Expand All @@ -142,11 +139,10 @@ func (s *Server) updateStats(
clientIP string,
) {
pctx := ctx.proxyCtx
e := stats.Entry{
Domain: aghnet.NormalizeDomain(pctx.Req.Question[0].Name),
Result: stats.RNotFiltered,
Time: uint32(elapsed / 1000),
Upstream: fromCache,
e := &stats.Entry{
Domain: aghnet.NormalizeDomain(pctx.Req.Question[0].Name),
Result: stats.RNotFiltered,
Time: elapsed,
}

if pctx.Upstream != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/dnsforward/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ type testStats struct {
// without actually implementing all methods.
stats.Interface

lastEntry stats.Entry
lastEntry *stats.Entry
}

// Update implements the [stats.Interface] interface for *testStats.
func (l *testStats) Update(e stats.Entry) {
func (l *testStats) Update(e *stats.Entry) {
if e.Domain == "" {
return
}
Expand Down
10 changes: 6 additions & 4 deletions internal/stats/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ type topAddrs = map[string]uint64
type StatsResp struct {
TimeUnits string `json:"time_units"`

TopQueried []topAddrs `json:"top_queried_domains"`
TopClients []topAddrs `json:"top_clients"`
TopBlocked []topAddrs `json:"top_blocked_domains"`
TopUpstreams []topAddrs `json:"top_upstreams"`
TopQueried []topAddrs `json:"top_queried_domains"`
TopClients []topAddrs `json:"top_clients"`
TopBlocked []topAddrs `json:"top_blocked_domains"`

TopUpstreamsTotal []topAddrs `json:"top_upstreams_total"`
TopUpstreamsAvgTime []topAddrs `json:"top_upstreams_avg_time"`

DNSQueries []uint64 `json:"dns_queries"`

Expand Down
8 changes: 4 additions & 4 deletions internal/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Interface interface {
io.Closer

// Update collects the incoming statistics data.
Update(e Entry)
Update(e *Entry)

// GetTopClientIP returns at most limit IP addresses corresponding to the
// clients with the most number of requests.
Expand Down Expand Up @@ -256,8 +256,8 @@ func (s *StatsCtx) Close() (err error) {
return udb.flushUnitToDB(tx, s.curr.id)
}

// Update implements the Interface interface for *StatsCtx.
func (s *StatsCtx) Update(e Entry) {
// Update implements the Interface interface for *StatsCtx. e must not be nil.
func (s *StatsCtx) Update(e *Entry) {
s.confMu.Lock()
defer s.confMu.Unlock()

Expand Down Expand Up @@ -285,7 +285,7 @@ func (s *StatsCtx) Update(e Entry) {
e.Client = ip.String()
}

s.curr.add(&e)
s.curr.add(e)
}

// WriteDiskConfig implements the Interface interface for *StatsCtx.
Expand Down
4 changes: 2 additions & 2 deletions internal/stats/stats_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ func TestStats_races(t *testing.T) {
testutil.CleanupAndRequireSuccess(t, s.Close)

writeFunc := func(start, fin *sync.WaitGroup, waitCh <-chan unit, i int) {
e := Entry{
e := &Entry{
Domain: fmt.Sprintf("example-%d.org", i),
Client: fmt.Sprintf("client_%d", i),
Result: Result(i)%(resultLast-1) + 1,
Time: uint32(time.Since(startTime).Milliseconds()),
Time: time.Since(startTime),
}

start.Done()
Expand Down
23 changes: 13 additions & 10 deletions internal/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"sync/atomic"
"testing"
"time"

"github.com/AdguardTeam/AdGuardHome/internal/stats"
"github.com/AdguardTeam/golibs/netutil"
Expand Down Expand Up @@ -74,26 +75,27 @@ func TestStats(t *testing.T) {
const reqDomain = "domain"
const respUpstream = "upstream"

entries := []stats.Entry{{
entries := []*stats.Entry{{
Domain: reqDomain,
Client: cliIPStr,
Result: stats.RFiltered,
Time: 123456,
Time: time.Microsecond * 123456,
Upstream: respUpstream,
}, {
Domain: reqDomain,
Client: cliIPStr,
Result: stats.RNotFiltered,
Time: 123456,
Time: time.Microsecond * 123456,
Upstream: respUpstream,
}}

wantData := &stats.StatsResp{
TimeUnits: "hours",
TopQueried: []map[string]uint64{0: {reqDomain: 1}},
TopClients: []map[string]uint64{0: {cliIPStr: 2}},
TopBlocked: []map[string]uint64{0: {reqDomain: 1}},
TopUpstreams: []map[string]uint64{0: {respUpstream: 2}},
TimeUnits: "hours",
TopQueried: []map[string]uint64{0: {reqDomain: 1}},
TopClients: []map[string]uint64{0: {cliIPStr: 2}},
TopBlocked: []map[string]uint64{0: {reqDomain: 1}},
TopUpstreamsTotal: []map[string]uint64{0: {respUpstream: 2}},
TopUpstreamsAvgTime: []map[string]uint64{0: {respUpstream: 123}},
DNSQueries: []uint64{
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2,
Expand Down Expand Up @@ -146,7 +148,8 @@ func TestStats(t *testing.T) {
TopQueried: []map[string]uint64{},
TopClients: []map[string]uint64{},
TopBlocked: []map[string]uint64{},
TopUpstreams: []map[string]uint64{},
TopUpstreamsTotal: []map[string]uint64{},
TopUpstreamsAvgTime: []map[string]uint64{},
DNSQueries: _24zeroes[:],
BlockedFiltering: _24zeroes[:],
ReplacedSafebrowsing: _24zeroes[:],
Expand Down Expand Up @@ -192,7 +195,7 @@ func TestLargeNumbers(t *testing.T) {

for i := 0; i < cliNumPerHour; i++ {
ip := net.IP{127, 0, byte((i & 0xff00) >> 8), byte(i & 0xff)}
e := stats.Entry{
e := &stats.Entry{
Domain: fmt.Sprintf("domain%d.hour%d", i, h),
Client: ip.String(),
Result: stats.RNotFiltered,
Expand Down
115 changes: 85 additions & 30 deletions internal/stats/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ type Entry struct {
// Result is the result of processing the request.
Result Result

// Time is the duration of the request processing in milliseconds.
Time uint32
// Time is the duration of the request processing.
Time time.Duration
}

// unit collects the statistics data for a specific period of time.
Expand All @@ -89,9 +89,12 @@ type unit struct {
// clients stores the number of requests from each client.
clients map[string]uint64

// upstreams stores the number of cached responses and responses from each
// upstream.
upstreams map[string]uint64
// upstreamsTotal stores the number of responses from each upstream.
upstreamsTotal map[string]uint64

// upstreamsTimeSum stores the sum of processing time in milliseconds of
// responses from each upstream.
upstreamsTimeSum map[string]uint64

// nResult stores the number of requests grouped by it's result.
nResult []uint64
Expand All @@ -106,20 +109,21 @@ type unit struct {
// nTotal stores the total number of requests.
nTotal uint64

// timeSum stores the sum of processing time in milliseconds of each request
// timeSum stores the sum of processing time in microseconds of each request
// written by the unit.
timeSum uint64
}

// newUnit allocates the new *unit.
func newUnit(id uint32) (u *unit) {
return &unit{
domains: map[string]uint64{},
blockedDomains: map[string]uint64{},
clients: map[string]uint64{},
upstreams: map[string]uint64{},
nResult: make([]uint64, resultLast),
id: id,
domains: map[string]uint64{},
blockedDomains: map[string]uint64{},
clients: map[string]uint64{},
upstreamsTotal: map[string]uint64{},
upstreamsTimeSum: map[string]uint64{},
nResult: make([]uint64, resultLast),
id: id,
}
}

Expand Down Expand Up @@ -147,8 +151,12 @@ type unitDB struct {
// Clients is the number of requests from each client.
Clients []countPair

// Upstreams is the number of responses from each upstream.
Upstreams []countPair
// UpstreamsTotal is the number of responses from each upstream.
UpstreamsTotal []countPair

// UpstreamsTimeSum is the sum of processing time in milliseconds of
// responses from each upstream.
UpstreamsTimeSum []countPair

// NTotal is the total number of requests.
NTotal uint64
Expand Down Expand Up @@ -233,13 +241,14 @@ func (u *unit) serialize() (udb *unitDB) {
}

return &unitDB{
NTotal: u.nTotal,
NResult: append([]uint64{}, u.nResult...),
Domains: convertMapToSlice(u.domains, maxDomains),
BlockedDomains: convertMapToSlice(u.blockedDomains, maxDomains),
Clients: convertMapToSlice(u.clients, maxClients),
Upstreams: convertMapToSlice(u.upstreams, maxUpstreams),
TimeAvg: timeAvg,
NTotal: u.nTotal,
NResult: append([]uint64{}, u.nResult...),
Domains: convertMapToSlice(u.domains, maxDomains),
BlockedDomains: convertMapToSlice(u.blockedDomains, maxDomains),
Clients: convertMapToSlice(u.clients, maxClients),
UpstreamsTotal: convertMapToSlice(u.upstreamsTotal, maxUpstreams),
UpstreamsTimeSum: convertMapToSlice(u.upstreamsTimeSum, maxUpstreams),
TimeAvg: timeAvg,
}
}

Expand Down Expand Up @@ -278,7 +287,8 @@ func (u *unit) deserialize(udb *unitDB) {
u.domains = convertSliceToMap(udb.Domains)
u.blockedDomains = convertSliceToMap(udb.BlockedDomains)
u.clients = convertSliceToMap(udb.Clients)
u.upstreams = convertSliceToMap(udb.Upstreams)
u.upstreamsTotal = convertSliceToMap(udb.UpstreamsTotal)
u.upstreamsTimeSum = convertSliceToMap(udb.UpstreamsTimeSum)
u.timeSum = uint64(udb.TimeAvg) * udb.NTotal
}

Expand All @@ -292,9 +302,14 @@ func (u *unit) add(e *Entry) {
}

u.clients[e.Client]++
u.timeSum += uint64(e.Time)
// NOTE: Microseconds for compatibility reasons.
u.timeSum += uint64(e.Time.Microseconds())
u.nTotal++
u.upstreams[e.Upstream]++

if e.Upstream != "" {
u.upstreamsTotal[e.Upstream]++
u.upstreamsTimeSum[e.Upstream] += uint64(e.Time.Milliseconds())
}
}

// flushUnitToDB puts udb to the database at id.
Expand Down Expand Up @@ -408,10 +423,11 @@ func (s *StatsCtx) getData(limit uint32) (StatsResp, bool) {
return StatsResp{
TimeUnits: "days",

TopBlocked: []topAddrs{},
TopClients: []topAddrs{},
TopQueried: []topAddrs{},
TopUpstreams: []topAddrs{},
TopBlocked: []topAddrs{},
TopClients: []topAddrs{},
TopQueried: []topAddrs{},
TopUpstreamsTotal: []topAddrs{},
TopUpstreamsAvgTime: []topAddrs{},

BlockedFiltering: []uint64{},
DNSQueries: []uint64{},
Expand All @@ -435,14 +451,17 @@ func (s *StatsCtx) getData(limit uint32) (StatsResp, bool) {
log.Fatalf("len(dnsQueries) != limit: %d %d", len(dnsQueries), limit)
}

topUpstreamsTotal, topUpstreamsAvgTime := topUpstreamsPairs(units)

data := StatsResp{
DNSQueries: dnsQueries,
BlockedFiltering: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RFiltered] }),
ReplacedSafebrowsing: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RSafeBrowsing] }),
ReplacedParental: statsCollector(units, firstID, timeUnit, func(u *unitDB) (num uint64) { return u.NResult[RParental] }),
TopQueried: topsCollector(units, maxDomains, s.ignored, func(u *unitDB) (pairs []countPair) { return u.Domains }),
TopBlocked: topsCollector(units, maxDomains, s.ignored, func(u *unitDB) (pairs []countPair) { return u.BlockedDomains }),
TopUpstreams: topsCollector(units, maxUpstreams, s.ignored, func(u *unitDB) (pairs []countPair) { return u.Upstreams }),
TopUpstreamsTotal: topUpstreamsTotal,
TopUpstreamsAvgTime: topUpstreamsAvgTime,
TopClients: topsCollector(units, maxClients, nil, topClientPairs(s)),
}

Expand Down Expand Up @@ -470,7 +489,8 @@ func (s *StatsCtx) getData(limit uint32) (StatsResp, bool) {
data.NumReplacedParental = sum.NResult[RParental]

if timeN != 0 {
data.AvgProcessingTime = float64(sum.TimeAvg/uint32(timeN)) / 1000000
// NOTE: From microseconds to seconds.
data.AvgProcessingTime = float64(sum.TimeAvg/uint32(timeN)) / 1_000_000
}

data.TimeUnits = "hours"
Expand All @@ -494,3 +514,38 @@ func topClientPairs(s *StatsCtx) (pg pairsGetter) {
return clients
}
}

// topUpstreamsPairs returns sorted lists of number of total responses and sum
// of processing times for each upstream.
func topUpstreamsPairs(units []*unitDB) (topUpstreamsTotal, topUpstreamsAvgTime []topAddrs) {
upstreamsTotal := topAddrs{}
upstreamsTimeSum := topAddrs{}

for _, u := range units {
for _, cp := range u.UpstreamsTotal {
upstreamsTotal[cp.Name] += cp.Count
}

for _, cp := range u.UpstreamsTimeSum {
upstreamsTimeSum[cp.Name] += cp.Count
}
}

upstreamsAvgTime := make(topAddrs)

for u, n := range upstreamsTotal {
total := upstreamsTimeSum[u]

if total != 0 {
upstreamsAvgTime[u] = total / n
}
}

s := convertMapToSlice(upstreamsTotal, maxUpstreams)
topUpstreamsTotal = convertTopSlice(s)

s = convertMapToSlice(upstreamsAvgTime, maxUpstreams)
topUpstreamsAvgTime = convertTopSlice(s)

return topUpstreamsTotal, topUpstreamsAvgTime
}
Loading

0 comments on commit 6124456

Please sign in to comment.