Skip to content

Commit

Permalink
proxy: imp code
Browse files Browse the repository at this point in the history
  • Loading branch information
schzhn committed Jan 22, 2025
1 parent ef492d4 commit b877057
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 57 deletions.
2 changes: 1 addition & 1 deletion proxy/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (p *Proxy) exchangeUpstreams(
return nil, nil, err
}

// TODO(e.burkov): p.updateRTT(u.Address(), elapsed)
// TODO(e.burkov): Consider updating the RTT of a single upstream.

return resp, u, err
}
Expand Down
4 changes: 2 additions & 2 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (p *Proxy) replyFromUpstream(d *DNSContext) (ok bool, err error) {
}

src := "upstream"
wrapped := upstreamsWithStats(upstreams, false)
wrapped := upstreamsWithStats(upstreams)

// Perform the DNS request.
resp, u, err := p.exchangeUpstreams(req, wrapped)
Expand All @@ -576,7 +576,7 @@ func (p *Proxy) replyFromUpstream(d *DNSContext) (ok bool, err error) {
// creating proxy.
upstreams = p.Fallbacks.getUpstreamsForDomain(req.Question[0].Name)

wrappedFallbacks = upstreamsWithStats(upstreams, true)
wrappedFallbacks = upstreamsWithStats(upstreams)
resp, u, err = upstream.ExchangeParallel(wrappedFallbacks, req)
}

Expand Down
77 changes: 23 additions & 54 deletions proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,6 @@ type upstreamWithStats struct {

// queryDuration is the duration of the successful DNS lookup.
queryDuration time.Duration

// isFallback indicates whether the upstream is a fallback upstream.
isFallback bool
}

// newUpstreamWithStats returns a new initialized *upstreamWithStats.
func newUpstreamWithStats(upstream upstream.Upstream, isFallback bool) (u *upstreamWithStats) {
return &upstreamWithStats{
upstream: upstream,
isFallback: isFallback,
}
}

// type check
Expand Down Expand Up @@ -57,14 +46,10 @@ func (u *upstreamWithStats) Close() (err error) {

// upstreamsWithStats takes a list of upstreams, wraps each upstream with
// [upstreamWithStats] to gather statistics, and returns the wrapped upstreams.
func upstreamsWithStats(
upstreams []upstream.Upstream,
isFallback bool,
) (wrapped []upstream.Upstream) {
func upstreamsWithStats(upstreams []upstream.Upstream) (wrapped []upstream.Upstream) {
wrapped = make([]upstream.Upstream, 0, len(upstreams))
for _, u := range upstreams {
w := newUpstreamWithStats(u, isFallback)
wrapped = append(wrapped, w)
wrapped = append(wrapped, &upstreamWithStats{upstream: u})
}

return wrapped
Expand Down Expand Up @@ -130,53 +115,39 @@ func collectQueryStats(
wrapped, ok = resolver.(*upstreamWithStats)
if !ok {
// Should never happen.
err := fmt.Errorf("unexpected type %T", resolver)
panic(err)
panic(fmt.Errorf("unexpected type %T", resolver))
}

unwrapped = wrapped.upstream
}

// The DNS query was not resolved.
if wrapped == nil {
return unwrapped, &QueryStatistics{
main: collectUpstreamStats(upstreams),
fallback: collectUpstreamStats(fallbacks),
return nil, &QueryStatistics{
main: collectUpstreamStats(upstreams...),
fallback: collectUpstreamStats(fallbacks...),
}
}

if mode == UpstreamModeFastestAddr && !wrapped.isFallback {
// The DNS query was successfully resolved by main resolver and the upstream
// mode is [UpstreamModeFastestAddr].
if mode == UpstreamModeFastestAddr && len(fallbacks) == 0 {
return unwrapped, &QueryStatistics{
main: collectUpstreamStats(upstreams),
main: collectUpstreamStats(upstreams...),
}
}

return unwrapped, collectResolverQueryStats(upstreams, wrapped)
}

// collectResolverQueryStats gathers the statistics from an upstream DNS
// resolver that successfully resolved the request. If resolver is the fallback
// DNS resolver, it also gathers the statistics for the upstream DNS resolvers.
// resolver must be not nil.
func collectResolverQueryStats(
upstreams []upstream.Upstream,
resolver *upstreamWithStats,
) (stats *QueryStatistics) {
dur, err := resolver.queryDuration, resolver.err
s := &UpstreamStatistics{
Address: resolver.upstream.Address(),
Error: err,
QueryDuration: dur,
}

if resolver.isFallback {
return &QueryStatistics{
main: collectUpstreamStats(upstreams),
fallback: []*UpstreamStatistics{s},
// The DNS query was resolved by fallback resolver.
if len(fallbacks) > 0 {
return unwrapped, &QueryStatistics{
main: collectUpstreamStats(upstreams...),
fallback: collectUpstreamStats(wrapped),
}
}

return &QueryStatistics{
main: []*UpstreamStatistics{s},
// The DNS query was successfully resolved by main resolver.
return unwrapped, &QueryStatistics{
main: collectUpstreamStats(wrapped),
}
}

Expand All @@ -200,22 +171,20 @@ type UpstreamStatistics struct {

// collectUpstreamStats gathers the upstream statistics from the list of wrapped
// upstreams. upstreams must be of type *upstreamWithStats.
func collectUpstreamStats(upstreams []upstream.Upstream) (stats []*UpstreamStatistics) {
func collectUpstreamStats(upstreams ...upstream.Upstream) (stats []*UpstreamStatistics) {
stats = make([]*UpstreamStatistics, 0, len(upstreams))

for _, u := range upstreams {
w, ok := u.(*upstreamWithStats)
if !ok {
// Should never happen.
err := fmt.Errorf("unexpected type %T", u)
panic(err)
panic(fmt.Errorf("unexpected type %T", u))
}

dur, err := w.queryDuration, w.err
stats = append(stats, &UpstreamStatistics{
Error: err,
Error: w.err,
Address: w.Address(),
QueryDuration: dur,
QueryDuration: w.queryDuration,
})
}

Expand Down

0 comments on commit b877057

Please sign in to comment.