Skip to content

Commit

Permalink
feat(router): support for isolation modes using limiters (#3379)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum authored Jun 8, 2023
1 parent 9e32802 commit fbe109f
Show file tree
Hide file tree
Showing 134 changed files with 3,093 additions and 3,851 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: '~1.20.3'
go-version: '~1.20.4'
check-latest: true
- run: go version
- run: go mod download # Not required, used to segregate module download vs test times
Expand Down Expand Up @@ -75,7 +75,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '~1.20.3'
go-version: '~1.20.4'
check-latest: true
- run: go version
- run: go mod download # Not required, used to segregate module download vs test times
Expand Down Expand Up @@ -107,7 +107,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: '~1.20.3'
go-version: '~1.20.4'
check-latest: true

- run: go version
Expand Down Expand Up @@ -156,7 +156,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '~1.20.3'
go-version: '~1.20.4'
check-latest: true
- name: Download coverage reports
uses: actions/download-artifact@v3
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: actions/setup-go@v4
with:
check-latest: true
go-version: '~1.20.3'
go-version: '~1.20.4'
- run: go version

- run: go mod tidy
Expand Down Expand Up @@ -55,7 +55,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: '~1.20.3'
go-version: '~1.20.4'
check-latest: true
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ install-tools:
go install mvdan.cc/gofumpt@latest
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.1
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0
go install gotest.tools/gotestsum@v1.8.2
go install gotest.tools/gotestsum@v1.10.0

.PHONY: lint
lint: fmt ## Run linters on all go files
Expand Down
4 changes: 2 additions & 2 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
"github.com/spf13/viper"

"github.com/rudderlabs/rudder-go-kit/config"
kithttputil "github.com/rudderlabs/rudder-go-kit/httputil"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
)

Expand Down Expand Up @@ -170,5 +170,5 @@ func StartServer(ctx context.Context) error {

srv := &http.Server{Handler: srvMux, ReadHeaderTimeout: 3 * time.Second}

return httputil.Serve(ctx, srv, l, time.Second)
return kithttputil.Serve(ctx, srv, l, time.Second)
}
90 changes: 0 additions & 90 deletions admin/profiler/profiler.go

This file was deleted.

3 changes: 2 additions & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
return fmt.Errorf("failed to create rt throttler factory: %w", err)
}
rtFactory := &router.Factory{
Logger: logger.NewLogger().Child("router"),
Reporting: reportingI,
Multitenant: multitenantStats,
BackendConfig: backendconfig.DefaultBackendConfig,
Expand All @@ -249,7 +250,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
Debugger: destinationHandle,
AdaptiveLimit: adaptiveLimit,
}
rt := routerManager.New(rtFactory, brtFactory, backendconfig.DefaultBackendConfig)
rt := routerManager.New(rtFactory, brtFactory, backendconfig.DefaultBackendConfig, logger.NewLogger())

dm := cluster.Dynamic{
Provider: modeProvider,
Expand Down
7 changes: 4 additions & 3 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
"github.com/rudderlabs/rudder-server/internal/pulsar"
"github.com/rudderlabs/rudder-server/router/throttler"
schema_forwarder "github.com/rudderlabs/rudder-server/schema-forwarder"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/payload"
"github.com/rudderlabs/rudder-server/utils/types/deployment"

"golang.org/x/sync/errgroup"

"github.com/bugsnag/bugsnag-go/v2"

kithttputil "github.com/rudderlabs/rudder-go-kit/httputil"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
Expand Down Expand Up @@ -235,6 +235,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
return fmt.Errorf("failed to create throttler factory: %w", err)
}
rtFactory := &router.Factory{
Logger: logger.NewLogger().Child("router"),
Reporting: reportingI,
Multitenant: multitenantStats,
BackendConfig: backendconfig.DefaultBackendConfig,
Expand All @@ -257,7 +258,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
Debugger: destinationHandle,
AdaptiveLimit: adaptiveLimit,
}
rt := routerManager.New(rtFactory, brtFactory, backendconfig.DefaultBackendConfig)
rt := routerManager.New(rtFactory, brtFactory, backendconfig.DefaultBackendConfig, logger.NewLogger())

dm := cluster.Dynamic{
Provider: modeProvider,
Expand Down Expand Up @@ -314,5 +315,5 @@ func (a *processorApp) startHealthWebHandler(ctx context.Context, db *jobsdb.Han
MaxHeaderBytes: a.config.http.MaxHeaderBytes,
}

return httputil.ListenAndServe(ctx, srv)
return kithttputil.ListenAndServe(ctx, srv)
}
4 changes: 2 additions & 2 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ func initJobsDB() {
jobsdb.Init()
jobsdb.Init2()
archiver.Init()
router.Init()
Init()
}

Expand Down Expand Up @@ -223,6 +222,7 @@ func TestDynamicClusterManager(t *testing.T) {

tDb := &jobsdb.MultiTenantHandleT{HandleT: rtDB}
rtFactory := &router.Factory{
Logger: logger.NOP,
Reporting: &reporting.NOOP{},
Multitenant: mockMTI,
BackendConfig: mockBackendConfig,
Expand All @@ -240,7 +240,7 @@ func TestDynamicClusterManager(t *testing.T) {
TransientSources: transientsource.NewEmptyService(),
RsourcesService: mockRsourcesService,
}
router := routermanager.New(rtFactory, brtFactory, mockBackendConfig)
router := routermanager.New(rtFactory, brtFactory, mockBackendConfig, logger.NewLogger())

mockBackendConfig.EXPECT().Subscribe(gomock.Any(), gomock.Any()).DoAndReturn(func(
ctx context.Context, topic backendConfig.Topic,
Expand Down
4 changes: 2 additions & 2 deletions backend-config/namespace_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
jsoniter "github.com/json-iterator/go"

"github.com/rudderlabs/rudder-go-kit/config"
kithttputil "github.com/rudderlabs/rudder-go-kit/httputil"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -183,7 +183,7 @@ func (nc *namespaceConfig) makeHTTPRequest(req *http.Request) ([]byte, error) {
return nil, err
}

defer func() { httputil.CloseResponse(resp) }()
defer func() { kithttputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions backend-config/single_workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/cenkalti/backoff"

"github.com/rudderlabs/rudder-go-kit/config"
kithttputil "github.com/rudderlabs/rudder-go-kit/httputil"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -151,7 +151,7 @@ func (wc *singleWorkspaceConfig) makeHTTPRequest(ctx context.Context, url string
return nil, err
}

defer func() { httputil.CloseResponse(resp) }()
defer func() { kithttputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/chiware"
"github.com/rudderlabs/rudder-go-kit/config"
kithttputil "github.com/rudderlabs/rudder-go-kit/httputil"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/app"
Expand All @@ -47,7 +48,6 @@ import (
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/services/rsources"
rsources_http "github.com/rudderlabs/rudder-server/services/rsources/http"
rs_httputil "github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -1384,7 +1384,7 @@ func (gateway *HandleT) StartWebHandler(ctx context.Context) error {
MaxHeaderBytes: maxHeaderBytes,
}

return rs_httputil.ListenAndServe(ctx, gateway.httpWebServer)
return kithttputil.ListenAndServe(ctx, gateway.httpWebServer)
}

// StartAdminHandler for Admin Operations
Expand All @@ -1404,7 +1404,7 @@ func (gateway *HandleT) StartAdminHandler(ctx context.Context) error {
ReadHeaderTimeout: ReadHeaderTimeout,
}

return rs_httputil.ListenAndServe(ctx, srv)
return kithttputil.ListenAndServe(ctx, srv)
}

// Gets the config from config backend and extracts enabled writekeys
Expand Down
8 changes: 4 additions & 4 deletions gateway/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"

kitHelper "github.com/rudderlabs/rudder-go-kit/testhelper"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/rudderlabs/rudder-server/app"
Expand Down Expand Up @@ -117,11 +117,11 @@ func testGatewayByAppType(t *testing.T, appType string) {
t.Logf("BackendConfig server listening on: %s", backendConfigSrv.URL)
t.Cleanup(backendConfigSrv.Close)

httpPort, err := kitHelper.GetFreePort()
httpPort, err := kithelper.GetFreePort()
require.NoError(t, err)
httpAdminPort, err := kitHelper.GetFreePort()
httpAdminPort, err := kithelper.GetFreePort()
require.NoError(t, err)
debugPort, err := kitHelper.GetFreePort()
debugPort, err := kithelper.GetFreePort()
require.NoError(t, err)

rudderTmpDir, err := os.MkdirTemp("", "rudder_server_*_test")
Expand Down
Loading

0 comments on commit fbe109f

Please sign in to comment.