Skip to content

Commit

Permalink
chore: applying 1.9.5 & 1.9.6 hotfixes to main branch (#3486)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum authored Jun 13, 2023
1 parent dacfdc8 commit c23ffb2
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 68 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## [1.9.6](https://github.com/rudderlabs/rudder-server/compare/v1.9.5...v1.9.6) (2023-06-09)


### Bug Fixes

* warehouse proxy endpoints ([#3476](https://github.com/rudderlabs/rudder-server/issues/3476)) ([fda977f](https://github.com/rudderlabs/rudder-server/commit/fda977f9e440aaa21337ff620c8fb8fe68385b2a))

## [1.9.5](https://github.com/rudderlabs/rudder-server/compare/v1.9.4...v1.9.5) (2023-06-07)


### Bug Fixes

* flag for warehouse altering ([#3460](https://github.com/rudderlabs/rudder-server/issues/3460)) ([472d310](https://github.com/rudderlabs/rudder-server/commit/472d3102fd04e91da2e832737dbb42ba2671dc3c))

## [1.9.4](https://github.com/rudderlabs/rudder-server/compare/v1.9.3...v1.9.4) (2023-06-06)


Expand Down
27 changes: 9 additions & 18 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,21 +956,6 @@ func (gateway *HandleT) beaconBatchHandler(w http.ResponseWriter, r *http.Reques
gateway.beaconHandler(w, r, "batch")
}

func warehouseHandler(w http.ResponseWriter, r *http.Request) {
origin, err := url.Parse(misc.GetWarehouseURL())
if err != nil {
http.Error(w, err.Error(), 404)
return
}
// gateway.logger.LogRequest(r)
director := func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = origin.Host
}
proxy := &httputil.ReverseProxy{Director: director}
proxy.ServeHTTP(w, r)
}

// ProcessRequest throws a webRequest into the queue and waits for the response before returning
func (*RegularRequestHandler) ProcessRequest(gateway *HandleT, w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string {
done := make(chan string, 1)
Expand Down Expand Up @@ -1327,11 +1312,17 @@ func (gateway *HandleT) StartWebHandler(ctx context.Context) error {
r.Post("/screen", gateway.webScreenHandler)
r.Post("/track", gateway.webTrackHandler)
r.Post("/webhook", gateway.webhookHandler.RequestHandler)
r.Post("/warehouse", warehouseHandler)
r.Post("/warehouse/pending-events", gateway.whProxy.ServeHTTP)

r.Get("/webhook", gateway.webhookHandler.RequestHandler)
r.Get("/warehouse", warehouseHandler)

r.Route("/warehouse", func(r chi.Router) {
r.Post("/pending-events", gateway.whProxy.ServeHTTP)
r.Post("/trigger-upload", gateway.whProxy.ServeHTTP)
r.Post("/jobs", gateway.whProxy.ServeHTTP)
r.Post("/fetch-tables", gateway.whProxy.ServeHTTP)

r.Get("/jobs/status", gateway.whProxy.ServeHTTP)
})
})

srvMux.Get("/health", WithContentType("application/json; charset=utf-8", app.LivenessHandler(gateway.jobsDB)))
Expand Down
78 changes: 28 additions & 50 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (
"net/http/httptest"
"net/url"
"reflect"
"strconv"
"testing"
"time"

kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -350,10 +353,18 @@ var _ = Describe("Gateway", func() {
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, "OK")
}))
serverURL = whServer.URL
parsedURL, err := url.Parse(serverURL)
WHURL := whServer.URL
parsedURL, err := url.Parse(WHURL)
Expect(err).To(BeNil())
whPort := parsedURL.Port()
GinkgoT().Setenv("RSERVER_WAREHOUSE_WEB_PORT", whPort)

serverPort, err := kithelper.GetFreePort()
Expect(err).To(BeNil())
config.Set("Warehouse.webPort", parsedURL.Port())
serverURL = fmt.Sprintf("http://localhost:%d", serverPort)
GinkgoT().Setenv("RSERVER_GATEWAY_WEB_PORT", strconv.Itoa(serverPort))

loadConfig()

gateway = &HandleT{}
err = gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
Expand Down Expand Up @@ -381,7 +392,6 @@ var _ = Describe("Gateway", func() {
}
verifyEndpoint := func(endpoints []string, method string) {
client := &http.Client{}
// baseURL := "http://localhost:8080"
for _, ep := range endpoints {
url := fmt.Sprintf("%s%s", serverURL, ep)
var req *http.Request
Expand Down Expand Up @@ -420,11 +430,14 @@ var _ = Describe("Gateway", func() {
close(wait)
}()
Eventually(func() bool {
resp, _ := http.Get(fmt.Sprintf("%s/version", serverURL))
resp, err := http.Get(fmt.Sprintf("%s/version", serverURL))
if err != nil {
return false
}
return resp.StatusCode == http.StatusOK
}, time.Second*10, time.Second).Should(BeTrue())

getEndpoint, postEndpoints, deleteEndpoints := getEndpointMethodMap()
getEndpoint, postEndpoints, deleteEndpoints := endpointsToVerify()
verifyEndpoint(getEndpoint, http.MethodGet)
verifyEndpoint(postEndpoints, http.MethodPost)
verifyEndpoint(deleteEndpoints, http.MethodDelete)
Expand Down Expand Up @@ -987,43 +1000,6 @@ var _ = Describe("Gateway", func() {
})
})

Context("Warehouse proxy", func() {
DescribeTable("forwarding requests to warehouse with different response codes",
func(url string, code int, payload string) {
gateway := &HandleT{}
whMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expect(r.URL.String()).To(Equal(url))
Expect(r.Body)
Expect(r.Body).To(Not(BeNil()))
defer func() { _ = r.Body.Close() }()
reqBody, err := io.ReadAll(r.Body)
Expect(err).To(BeNil())
Expect(string(reqBody)).To(Equal(payload))
w.WriteHeader(code)
}))
GinkgoT().Setenv("WAREHOUSE_URL", whMock.URL)
GinkgoT().Setenv("RSERVER_WAREHOUSE_MODE", config.OffMode)
err := gateway.Setup(context.Background(), c.mockApp, c.mockBackendConfig, c.mockJobsDB, nil, c.mockVersionHandler, rsources.NewNoOpService(), sourcedebugger.NewNoOpService())
Expect(err).To(BeNil())

defer func() {
err := gateway.Shutdown()
Expect(err).To(BeNil())
whMock.Close()
}()

req := httptest.NewRequest("POST", "http://rudder-server"+url, bytes.NewBufferString(payload))
w := httptest.NewRecorder()
gateway.whProxy.ServeHTTP(w, req)
resp := w.Result()
Expect(resp.StatusCode).To(Equal(code))
},
Entry("successful request", "/v1/warehouse/pending-events", http.StatusOK, `{"source_id": "1", "task_run_id":"2"}`),
Entry("failed request", "/v1/warehouse/pending-events", http.StatusBadRequest, `{"source_id": "3", "task_run_id":"4"}`),
Entry("request with query parameters", "/v1/warehouse/pending-events?triggerUpload=true", http.StatusOK, `{"source_id": "5", "task_run_id":"6"}`),
)
})

Context("jobDataFromRequest", func() {
var (
gateway *HandleT
Expand Down Expand Up @@ -1161,19 +1137,19 @@ func expectHandlerResponse(handler http.HandlerFunc, req *http.Request, response
}

// return all endpoints as key and method as value
func getEndpointMethodMap() (getEndpoints, postEndpoints, deleteEndpoints []string) {
getEndpoints = []string{
func endpointsToVerify() ([]string, []string, []string) {
getEndpoints := []string{
"/version",
"/robots.txt",
"/pixel/v1/track",
"/pixel/v1/page",
"/v1/warehouse",
"/v1/webhook",
"/v1/job-status/123",
"/v1/job-status/123/failed-records",
"/v1/warehouse/jobs/status",
}

postEndpoints = []string{
postEndpoints := []string{
"/v1/batch",
"/v1/identify",
"/v1/track",
Expand All @@ -1185,16 +1161,18 @@ func getEndpointMethodMap() (getEndpoints, postEndpoints, deleteEndpoints []stri
"/v1/import",
"/v1/audiencelist",
"/v1/webhook",
"/v1/warehouse",
"/beacon/v1/batch",
"/internal/v1/extract",
"/v1/warehouse/pending-events",
"/v1/warehouse/trigger-upload",
"/v1/warehouse/jobs",
"/v1/warehouse/fetch-tables",
}

deleteEndpoints = []string{
deleteEndpoints := []string{
"/v1/job-status/1234",
}
return
return getEndpoints, postEndpoints, deleteEndpoints
}

func allHandlers(gateway *HandleT) map[string]http.HandlerFunc {
Expand Down
15 changes: 15 additions & 0 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type UploadJob struct {
refreshPartitionBatchSize int
retryTimeWindow time.Duration
minRetryAttempts int
DisableAlter bool

errorHandler ErrorHandler
}
Expand Down Expand Up @@ -196,6 +197,7 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo
refreshPartitionBatchSize: config.GetInt("Warehouse.refreshPartitionBatchSize", 100),
retryTimeWindow: retryTimeWindow,
minRetryAttempts: minRetryAttempts,
DisableAlter: config.GetBool("Warehouse.disableAlter", false),

alertSender: alerta.NewClient(
config.GetString("ALERTA_URL", "https://alerta.rudderstack.com/api/"),
Expand Down Expand Up @@ -794,6 +796,19 @@ func (job *UploadJob) UpdateTableSchema(tName string, tableSchemaDiff warehouseu
}

func (job *UploadJob) alterColumnsToWarehouse(tName string, columnsMap model.TableSchema) error {
if job.DisableAlter {
pkgLogger.Debugw("skipping alter columns to warehouse",
logfield.SourceID, job.warehouse.Source.ID,
logfield.SourceType, job.warehouse.Source.SourceDefinition.Name,
logfield.DestinationID, job.warehouse.Destination.ID,
logfield.DestinationType, job.warehouse.Destination.DestinationDefinition.Name,
logfield.WorkspaceID, job.warehouse.WorkspaceID,
logfield.TableName, tName,
"columns", columnsMap,
)
return nil
}

var responseToAlerta []model.AlterTableResponse
var errs []error

Expand Down

0 comments on commit c23ffb2

Please sign in to comment.