Skip to content

Commit

Permalink
Add per URI rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
niels committed Dec 20, 2024
1 parent 4581f92 commit 82750f0
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 75 deletions.
9 changes: 8 additions & 1 deletion cmd/backend/backend_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,14 @@ func main() {
bMetrics := backend.CreateBackendMetrics(metricsRegistry)
rMetrics := ratelimit.CreateRatelimiterMetrics(metricsRegistry)

rateLimiter := ratelimit.NewWindowRateLimiter(cfg.Backend.RateLimiter.RateWindow, cfg.Backend.RateLimiter.BucketDuration, cfg.Backend.RateLimiter.MaxRequestsPerWindow, cfg.Backend.RateLimiter.MaxRequestsPerBucket, rMetrics)
rateLimiter := ratelimit.NewWindowRateLimiter(
cfg.Backend.RateLimiter.RateWindow,
cfg.Backend.RateLimiter.BucketDuration,
cfg.Backend.RateLimiter.MaxIPRequestsPerWindow,
cfg.Backend.RateLimiter.MaxIPRequestsPerBucket,
cfg.Backend.RateLimiter.MaxURIRequestsPerWindow,
cfg.Backend.RateLimiter.MaxURIRequestsPerBucket,
rMetrics)
rateLimiter.Start()

var llmResponder responder.Responder
Expand Down
7 changes: 4 additions & 3 deletions config/backend-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ whois_manager:
ratelimiter:
rate_window: 1h
bucket_duration: 1m
max_requests_per_window: 500
max_requests_per_bucket: 50

max_ip_requests_per_window: 500
max_ip_requests_per_bucket: 50
max_uri_requests_per_window: 1500
max_uri_requests_per_bucket: 200
ai:
# Whether to enable the responder.
enable_responder: 1
Expand Down
2 changes: 1 addition & 1 deletion config/database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ CREATE TABLE whois (

-- These need to be kept in sync with pkg/util/constants/shared_constants.go
CREATE TYPE IP_EVENT_TYPE AS ENUM ('UNKNOWN', 'TRAFFIC_CLASS', 'HOSTED_MALWARE', 'SENT_MALWARE', 'RATELIMITED', 'HOST_C2');
CREATE TYPE IP_EVENT_SUB_TYPE AS ENUM ('UNKNOWN', 'NONE', 'MALWARE_NEW', 'MALWARE_OLD', 'RATE_WINDOW', 'RATE_BUCKET', 'TC_SCANNED', 'TC_ATTACKED', 'TC_RECONNED', 'TC_BRUTEFORCED', 'TC_CRAWLED', 'TC_MALICIOUS');
CREATE TYPE IP_EVENT_SUB_TYPE AS ENUM ('UNKNOWN', 'NONE', 'MALWARE_NEW', 'MALWARE_OLD', 'IP_RATE_WINDOW', 'IP_RATE_BUCKET', 'URI_RATE_WINDOW', 'URI_RATE_BUCKET', 'TC_SCANNED', 'TC_ATTACKED', 'TC_RECONNED', 'TC_BRUTEFORCED', 'TC_CRAWLED', 'TC_MALICIOUS');
CREATE TYPE IP_EVENT_SOURCE AS ENUM ('OTHER', 'VT', 'RULE', 'BACKEND', 'ANALYSIS', 'WHOIS', 'AI');
CREATE TYPE IP_EVENT_REF_TYPE AS ENUM ('UNKNOWN', 'NONE', 'REQUEST_ID', 'RULE_ID', 'CONTENT_ID', 'VT_ANALYSIS_ID', 'DOWNLOAD_ID', 'REQUEST_DESCRIPTION_ID', 'REQUEST_SOURCE_IP', 'SESSION_ID', 'APP_ID');
CREATE TABLE ip_event (
Expand Down
19 changes: 13 additions & 6 deletions pkg/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,19 @@ func (s *BackendServer) HandleProbe(ctx context.Context, req *backend_service.Ha
}

switch err {
case ratelimit.ErrBucketLimitExceeded:
evt.Subtype = constants.IpEventSubTypeRateBucket
s.metrics.rateLimiterRejects.WithLabelValues(RatelimiterRejectReasonBucket).Add(1)
case ratelimit.ErrWindowLimitExceeded:
evt.Subtype = constants.IpEventSubTypeRateWindow
s.metrics.rateLimiterRejects.WithLabelValues(RatelimiterRejectReasonWindow).Add(1)
case ratelimit.ErrIPBucketLimitExceeded:
evt.Subtype = constants.IpEventSubTypeRateIPBucket
s.metrics.rateLimiterRejects.WithLabelValues(RatelimiterRejectReasonIPBucket).Add(1)
case ratelimit.ErrIPWindowLimitExceeded:
evt.Subtype = constants.IpEventSubTypeRateIPWindow
s.metrics.rateLimiterRejects.WithLabelValues(RatelimiterRejectReasonIPWindow).Add(1)
case ratelimit.ErrURIBucketLimitExceeded:
evt.Subtype = constants.IpEventSubTypeRateURIBucket
s.metrics.rateLimiterRejects.WithLabelValues(RatelimiterRejectReasonURIBucket).Add(1)
case ratelimit.ErrURIWindowLimitExceeded:
evt.Subtype = constants.IpEventSubTypeRateURIWindow
s.metrics.rateLimiterRejects.WithLabelValues(RatelimiterRejectReasonURIWindow).Add(1)

default:
slog.Error("error happened in ratelimiter", slog.String("error", err.Error()))
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ type Config struct {
MaxDownloadSizeMB int `fig:"max_download_size_mb" default:"200"`
} `fig:"downloader"`
RateLimiter struct {
RateWindow time.Duration `fig:"rate_window" default:"1h"`
BucketDuration time.Duration `fig:"bucket_duration" default:"1m"`
MaxRequestsPerWindow int `fig:"max_requests_per_window" default:"1000"`
MaxRequestsPerBucket int `fig:"max_requests_per_bucket" default:"50"`
RateWindow time.Duration `fig:"rate_window" default:"1h"`
BucketDuration time.Duration `fig:"bucket_duration" default:"1m"`
MaxIPRequestsPerWindow int `fig:"max_ip_requests_per_window" default:"1000"`
MaxIPRequestsPerBucket int `fig:"max_ip_requests_per_bucket" default:"50"`
MaxURIRequestsPerWindow int `fig:"max_uri_requests_per_window" default:"2000"`
MaxURIRequestsPerBucket int `fig:"max_uri_requests_per_bucket" default:"100"`
} `fig:"ratelimiter"`

Advanced struct {
Expand Down Expand Up @@ -110,7 +112,7 @@ type Config struct {
LLMCompletionTimeout time.Duration `fig:"llm_completion_timeout" default:"1m"`
LLMConcurrentRequests int `fig:"llm_concurrent_requests" default:"5"`
MaxInputCharacters int `fig:"max_input_characters" default:"4096"`
Triage struct {
Triage struct {
Enable bool `fig:"enable"`
LogFile string `fig:"log_file" default:"triage.log" `
LogLevel string `fig:"log_level" default:"debug" `
Expand Down
6 changes: 4 additions & 2 deletions pkg/backend/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
)

var (
RatelimiterRejectReasonWindow = "window"
RatelimiterRejectReasonBucket = "bucket"
RatelimiterRejectReasonIPWindow = "ip_window"
RatelimiterRejectReasonIPBucket = "ip_bucket"
RatelimiterRejectReasonURIWindow = "uri_window"
RatelimiterRejectReasonURIBucket = "uri_bucket"
)

type BackendMetrics struct {
Expand Down
18 changes: 12 additions & 6 deletions pkg/backend/ratelimit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,33 @@
// You should have received a copy of the GNU General Public License along
// with this program; if not, write to the Free Software Foundation, Inc.,
// 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
package ratelimit

import (
"github.com/prometheus/client_golang/prometheus"
)

type RatelimiterMetrics struct {
rateBucketsGauge prometheus.Gauge
ipRateBucketsGauge prometheus.Gauge
uriRateBucketsGauge prometheus.Gauge
}

// Register Metrics
func CreateRatelimiterMetrics(reg prometheus.Registerer) *RatelimiterMetrics {
m := &RatelimiterMetrics{
rateBucketsGauge: prometheus.NewGauge(
ipRateBucketsGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "lophiid_backend_ratelimit_ip_buckets_gauge",
Help: "The amount of active IP ratelimit buckets"},
),
uriRateBucketsGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "lophiid_backend_ratelimit_buckets_gauge",
Help: "The amount of active ratelimit buckets"},
Name: "lophiid_backend_ratelimit_uri_buckets_gauge",
Help: "The amount of active URI ratelimit buckets"},
),
}

reg.MustRegister(m.rateBucketsGauge)
reg.MustRegister(m.ipRateBucketsGauge)
reg.MustRegister(m.uriRateBucketsGauge)
return m
}
144 changes: 103 additions & 41 deletions pkg/backend/ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
)

var (
ErrBucketLimitExceeded = errors.New("bucket limit exceeded")
ErrWindowLimitExceeded = errors.New("window limit exceeded")
ErrIPBucketLimitExceeded = errors.New("IP bucket limit exceeded")
ErrIPWindowLimitExceeded = errors.New("IP window limit exceeded")
ErrURIBucketLimitExceeded = errors.New("URI bucket limit exceeded")
ErrURIWindowLimitExceeded = errors.New("URI window limit exceeded")
)

type RateLimiter interface {
Expand All @@ -46,28 +48,35 @@ type RateLimiter interface {
//
// Requires Start() to be called before usage.
type WindowRateLimiter struct {
MaxRequestsPerWindow int
MaxRequestPerBucket int
RateWindow time.Duration
BucketDuration time.Duration
NumberBuckets int
RateBuckets map[string][]int
Metrics *RatelimiterMetrics
rateMu sync.Mutex
bgChan chan bool
MaxIPRequestsPerWindow int
MaxIPRequestPerBucket int
MaxURIRequestsPerWindow int
MaxURIRequestPerBucket int
RateWindow time.Duration
BucketDuration time.Duration
NumberBuckets int
IPRateBuckets map[string][]int
URIRateBuckets map[string][]int
Metrics *RatelimiterMetrics
rateIPMu sync.Mutex
rateURIMu sync.Mutex
bgChan chan bool
}

func NewWindowRateLimiter(rateWindow time.Duration, bucketDuration time.Duration, maxRequestsPerWindow int, maxRequestPerBucket int, metrics *RatelimiterMetrics) *WindowRateLimiter {
func NewWindowRateLimiter(rateWindow time.Duration, bucketDuration time.Duration, maxIpRequestsPerWindow int, maxIpRequestPerBucket int, maxUriRequestsPerWindow int, maxUriRequestPerBucket int, metrics *RatelimiterMetrics) *WindowRateLimiter {
slog.Info("Creating ratelimiter", slog.String("window_size", rateWindow.String()), slog.String("bucket_size", bucketDuration.String()))
return &WindowRateLimiter{
BucketDuration: bucketDuration,
MaxRequestPerBucket: maxRequestPerBucket,
MaxRequestsPerWindow: maxRequestsPerWindow,
RateWindow: rateWindow,
RateBuckets: make(map[string][]int),
NumberBuckets: int(rateWindow / bucketDuration),
Metrics: metrics,
bgChan: make(chan bool),
BucketDuration: bucketDuration,
MaxIPRequestPerBucket: maxIpRequestPerBucket,
MaxIPRequestsPerWindow: maxIpRequestsPerWindow,
MaxURIRequestPerBucket: maxUriRequestPerBucket,
MaxURIRequestsPerWindow: maxUriRequestsPerWindow,
RateWindow: rateWindow,
IPRateBuckets: make(map[string][]int),
URIRateBuckets: make(map[string][]int),
NumberBuckets: int(rateWindow / bucketDuration),
Metrics: metrics,
bgChan: make(chan bool),
}
}

Expand Down Expand Up @@ -104,52 +113,105 @@ func GetSumOfWindow(window []int) int {
// bucket while removing windows where all buckets are 0 (basically no traffic
// seen).
func (r *WindowRateLimiter) Tick() {
r.rateMu.Lock()
defer r.rateMu.Unlock()
r.rateIPMu.Lock()
for k := range r.IPRateBuckets {
r.IPRateBuckets[k] = r.IPRateBuckets[k][1:]
r.IPRateBuckets[k] = append(r.IPRateBuckets[k], 0)

for k := range r.RateBuckets {
r.RateBuckets[k] = r.RateBuckets[k][1:]
r.RateBuckets[k] = append(r.RateBuckets[k], 0)
if GetSumOfWindow(r.IPRateBuckets[k]) == 0 {
delete(r.IPRateBuckets, k)
}
}
r.rateIPMu.Unlock()

if GetSumOfWindow(r.RateBuckets[k]) == 0 {
delete(r.RateBuckets, k)
r.rateURIMu.Lock()
for k := range r.URIRateBuckets {
r.URIRateBuckets[k] = r.URIRateBuckets[k][1:]
r.URIRateBuckets[k] = append(r.URIRateBuckets[k], 0)

if GetSumOfWindow(r.URIRateBuckets[k]) == 0 {
delete(r.URIRateBuckets, k)
}
}
r.Metrics.rateBucketsGauge.Set(float64(len(r.RateBuckets)))
r.rateURIMu.Unlock()

r.Metrics.ipRateBucketsGauge.Set(float64(len(r.IPRateBuckets)))
r.Metrics.uriRateBucketsGauge.Set(float64(len(r.URIRateBuckets)))
}

// AllowRequest will return true if a request is allowed because the total
// requests in a window or bucket is not exceeded. If a request is not allowed
// then an error is returned with the reason why.
// Requires that Start() has been called before usage.
func (r *WindowRateLimiter) AllowRequest(req *models.Request) (bool, error) {
rKey := fmt.Sprintf("%s-%d-%s", req.HoneypotIP, req.Port, req.SourceIP)
ret, err := r.allowRequestForIP(req)
if !ret {
return ret, err
}

return r.allowRequestForURI(req)
}

r.rateMu.Lock()
defer r.rateMu.Unlock()
func (r *WindowRateLimiter) allowRequestForIP(req *models.Request) (bool, error) {

_, ok := r.RateBuckets[rKey]
ipRateKey := fmt.Sprintf("%s-%d-%s", req.HoneypotIP, req.Port, req.SourceIP)
r.rateIPMu.Lock()
defer r.rateIPMu.Unlock()

_, ok := r.IPRateBuckets[ipRateKey]
// If the key is not present then this IP has no recent requests logged so we
// create the buckets.
if !ok {
r.RateBuckets[rKey] = make([]int, r.NumberBuckets)
r.RateBuckets[rKey][r.NumberBuckets-1] = 1
r.IPRateBuckets[ipRateKey] = make([]int, r.NumberBuckets)
r.IPRateBuckets[ipRateKey][r.NumberBuckets-1] = 1
return true, nil
}

// Check how many requests there have been in this window.
if GetSumOfWindow(r.IPRateBuckets[ipRateKey]) >= r.MaxIPRequestsPerWindow {
r.IPRateBuckets[ipRateKey][r.NumberBuckets-1] += 1
return false, ErrIPWindowLimitExceeded
}

// Check if the bucket limit is not already exceeded.
if r.IPRateBuckets[ipRateKey][r.NumberBuckets-1] >= r.MaxIPRequestPerBucket {
r.IPRateBuckets[ipRateKey][r.NumberBuckets-1] += 1
return false, ErrIPBucketLimitExceeded
}

r.IPRateBuckets[ipRateKey][r.NumberBuckets-1] += 1

return true, nil
}

func (r *WindowRateLimiter) allowRequestForURI(req *models.Request) (bool, error) {
uriRateKey := req.BaseHash

r.rateURIMu.Lock()
defer r.rateURIMu.Unlock()

_, ok := r.URIRateBuckets[uriRateKey]
// If the key is not present then this URI has no recent requests logged so we
// create the buckets.
if !ok {
r.URIRateBuckets[uriRateKey] = make([]int, r.NumberBuckets)
r.URIRateBuckets[uriRateKey][r.NumberBuckets-1] = 1
return true, nil
}

// Check how many requests there have been in this window.
if GetSumOfWindow(r.RateBuckets[rKey]) >= r.MaxRequestsPerWindow {
r.RateBuckets[rKey][r.NumberBuckets-1] += 1
return false, ErrWindowLimitExceeded
if GetSumOfWindow(r.URIRateBuckets[uriRateKey]) >= r.MaxURIRequestsPerWindow {
r.URIRateBuckets[uriRateKey][r.NumberBuckets-1] += 1
return false, ErrURIWindowLimitExceeded
}

// Check if the bucket limit is not already exceeded.
if r.RateBuckets[rKey][r.NumberBuckets-1] >= r.MaxRequestPerBucket {
r.RateBuckets[rKey][r.NumberBuckets-1] += 1
return false, ErrBucketLimitExceeded
if r.URIRateBuckets[uriRateKey][r.NumberBuckets-1] >= r.MaxURIRequestPerBucket {
r.URIRateBuckets[uriRateKey][r.NumberBuckets-1] += 1
return false, ErrURIBucketLimitExceeded
}

r.RateBuckets[rKey][r.NumberBuckets-1] += 1
r.URIRateBuckets[uriRateKey][r.NumberBuckets-1] += 1

return true, nil
}
Expand Down
Loading

0 comments on commit 82750f0

Please sign in to comment.